Coverage for MPP/MPP.py: 86%
436 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-22 12:22 +0200
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-22 12:22 +0200
1import os
2import datetime
3import warnings
4import numpy as np
5import numpy.typing as npt
6import msmhelper as mh
7import mdtraj as md
9from typing import Literal
10from pathlib import Path
11from tqdm import tqdm
12from collections.abc import Iterable
13from sklearn.metrics import davies_bouldin_score
14import pygpcca as gp
16from . import core
17from . import utils
18from . import kernel as kernel_module
19from . import plot
20from .graph import draw_knetwork
23class Lumping(object):
24 """A class to hold all the information and methods for MPP.
26 The most probable path (MPP) algorithm reduces the number of states
27 in a microstate trajectory of a Markovian process in order to
28 simplify its analysis. This two-step algorithm first builds a
29 lumping tree by successively merging the least metastable state
30 together with its most similar other state. In a second step, the
31 lumping tree is parsed in reverse order (from the root) in order to
32 partition the microstates into macrostates, which satisfy a minimum
33 metastability and a minimum population criterion. For details, see
34 the reference publication (see below).
36 Attributes
37 ----------
38 trajectory : ndarray of int, shape (N,)
39 The microstate trajectory. N is the number of frames.
40 lagtime : int
41 Lagtime used, in frames.
42 frame_length : float
43 Length of a frame in ns. (default 0.2)
44 pop_thr : float
45 Population threshold for macrostates. (default 0.005)
46 q_min : float
47 Minimum metastability of macrostates. (default 0.5)
48 limits : list of int
49 If the trajectory is composed of several independent
50 simulations, this list contains the lengths of the individual
51 trajectories. (default None)
52 tmat : ndarray of float, shape (n_states, n_states)
53 Transition matrix of the microstate Markov state model.
54 pop : ndarray of int, shape (N,)
55 Population of the microstates in the trajectory. N is the
56 number of microstates.
57 n_states : int
58 Number of microstates.
59 multi_feature_trajectory : ndarray of float, shape (N, M)
60 A feature trajectory of M features and N frames. For example
61 selected contact distances are passed here. Must be the same as
62 for trajectory.
63 mean_feature_trajectory : ndarray of float, shape (N,)
64 The mean feature trajectory of N frames.
65 mean_feature : ndarray of float, shape (N,)
66 The mean feature for each of N microstates.
67 Z : ndarray of float, shape (N-1, 4)
68 The Z matrix defines how the microstates are lumped together.
69 reference : Lumping
70 Instance of the Lumping object with reference parameters, that
71 are, the only the transition probability is utilized as
72 similarity metric.
73 macrostate_assignment : list of (ndarray of bool,
74 shape (Lumping.n_macrostates, Lumping.n_states))
75 Macrostate assignment of the lumpings. Elements, whose indices
76 correspond to an assignment are True, all other elements are
77 False.
78 macrostate_map : list of (ndarray of int, shape (N,))
79 Map arrays of the assignment. The index of the array
80 corresponds to the microstate and the value is the index of the
81 assigned macrostate.
82 macrostate_tmat : list of (ndarray of float, shape (N, N))
83 Transition matrices of the lumpings.
84 macrostate_trajectory : ndarray of int, shape (N, M)
85 Macrostate trajectories. N is the number of runs and M the
86 langhth of the trajectories.
87 n_macrostates : list of int
88 Number of macrostates in each lumping.
90 Citation
91 --------
92 tbd
93 """
95 lagtime: int
96 """The lagtime used, in frames."""
98 contact_threshold: float
99 """Distance below which a feature (e.g. a contact) is considered
100 formed.
101 """
103 pop_thr: float
104 """Minimum population allowed for macrostates."""
106 q_min: float
107 """Minimum metastability allowed for macrostates."""
109 limits: list[int]
110 """When concatenated trajectories are used, the lengths of the
111 individual trajectories.
112 """
114 n_states: int
115 """The number of microstates."""
117 quiet: bool
118 """Suppress progress reports."""
120 Z: npt.NDArray[float]
121 """The Z matrix defines how the microstates are lumped together.
123 A 2D array of float, shape (Lumping.n_states-1, 4). The Z matrix
124 defines a lumping and is organized like the Z matrix returned from
125 scipy.cluster.hierarchy.linkage. Each row describes a merging of
126 two states in the first step of the MPP algorithm. The first two
127 columns contain the state indices, which are merged. The third
128 column holds the metastability of the less stable state (the state
129 in the first column) and the last column provides the population of
130 the new state, which has index Lumping.n_states + i, where i is the
131 row index.
132 """
134 xtc_stride: float
135 """Use only every Lumping.xtc_stride frame of the xtc trajectory."""
137 plot: "Plotter"
138 """Produce various plots of the lumping."""
140 macrostate_feature: list[list[float]]
141 """Mean macrostate feature."""
143 macrostate_multi_feature: list[list[npt.NDArray[np.floating]]]
144 """Macrostate features."""
146 macrostate_assignment: list[npt.NDArray[bool]]
147 """Macrostate assignments."""
149 macrostate_map: list[npt.NDArray]
150 # macrostate_tmat = []
151 # macrostate_trajectory = np.zeros(
152 # n_macrostates = []
154 def __init__(
155 self,
156 trajectory: npt.NDArray[np.int_],
157 lagtime: int,
158 feature_trajectory: npt.NDArray[np.floating] | None = None,
159 contact_threshold: float = 0.45,
160 pop_thr: float = 0.005,
161 q_min: float = 0.5,
162 frame_length: float = 0.2,
163 limits: list[int] | None = None,
164 quiet: bool = False,
165 ) -> None:
166 """Initialize a Lumping object.
168 Parameters
169 ----------
170 trajectory : ndarray of int, shape (N,)
171 The microstate trajectory. N is the number of frames.
172 lagtime : int
173 Lag time used.
174 feature_trajectory : ndarray of float, shape (N, M)
175 A feature trajectory of M features and N frames. For
176 example selected contact distances are passed here. Must
177 be the same as for trajectory. If None, a moch array of
178 ones is created. (default None)
179 contact_threshold : float
180 Distance in feature space below which the interaction is
181 considered positive, e.g. a contact distance below which a
182 contact is considered formed. (default 0.45)
183 pop_thr : float
184 Population threshold for macrostates. (default 0.005)
185 q_min : float
186 Minimum metastability of macrostates. (default 0.5)
187 frame_length : float
188 Length of a frame in ns. (default 0.2)
189 limits : list of int
190 If the trajectory is composed of several independent
191 simulations, this list contains the lengths of the
192 individual trajectories. (default None)
193 quiet : bool
194 Whether to hide progress reports or not. (default False)
195 """
196 self.trajectory = trajectory
197 self.frame_length = frame_length
198 self.lagtime = lagtime
199 self.pop_thr = pop_thr
200 self.q_min = q_min
201 self.limits = limits
202 self.tmat, states = mh.msm.estimate_markov_model(
203 utils.get_multi_state_trajectory(self.trajectory, self.limits),
204 self.lagtime,
205 )
206 _, self.pop = np.unique(self.trajectory, return_counts=True)
207 self.n_states = len(states)
208 self.quiet = quiet
209 if feature_trajectory is not None: 209 ↛ 213line 209 didn't jump to line 213 because the condition on line 209 was always true
210 self.contact_threshold = contact_threshold
211 self._add_feature(feature_trajectory)
212 else:
213 self._add_feature(np.ones((trajectory.shape, 1)))
215 self.Z = None
216 self._timescales = None
217 self._linkage = None
218 self._macrostate_population = None
219 self._tree = None
220 self._shannon_entropy = None
221 self._davies_bouldin_index = None
222 self._gmrq = None
223 self._reference = None
224 self._topology_file = None
225 self._xtc_trajectory_file = None
226 self._rmsd = None
227 self._n_i = None
228 self._macro_micro_feature = None
229 self.xtc_stride = None
230 self.plot = Plotter(self)
232 def _add_feature(
233 self,
234 feature_trajectory: npt.NDArray[np.floating],
235 ):
236 """Add feature data to the instance.
238 feature_trajectory : ndarray of float, shape (N, M)
239 A feature trajectory of M features and N frames. For
240 example selected contact distances are passed here. Must
241 be the same as for trajectory.
242 """
243 if feature_trajectory.shape[0] != self.trajectory.shape[0]: 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true
244 raise ValueError(
245 (
246 "feature_trajectory must have the same length as the microstate "
247 "trajectory (mpp.trajectory)"
248 )
249 )
250 if feature_trajectory.ndim == 2: 250 ↛ 253line 250 didn't jump to line 253 because the condition on line 250 was always true
251 self.multi_feature_trajectory = feature_trajectory
252 else:
253 raise ValueError("feature_trajectory must be 2 D")
255 self.multi_feature_trajectory_bool = (
256 self.multi_feature_trajectory < self.contact_threshold
257 )
258 self.mean_feature_trajectory = self.multi_feature_trajectory_bool.mean(axis=1)
259 self.mean_feature = np.zeros(self.n_states)
260 for i in range(self.n_states):
261 self.mean_feature[i] = self.mean_feature_trajectory[
262 self.trajectory == i
263 ].mean()
265 def run_mpp(
266 self,
267 kernel: kernel_module.LumpingKernel = kernel_module.LumpingKernel(),
268 feature_kernel: kernel_module.FeatureKernel | None = None,
269 n: int = 1,
270 ) -> None:
271 """Run the MPP algorithm.
273 Parameters
274 ----------
275 kernel : LumpingKernel
276 An instance of the LumpingKernel object, which determines
277 the similarity metric employed.
278 (default kernel_module.LumpingKernel())
279 feature_kernel : FeatureKernel
280 An instance of the FeatureKernel object, which determines
281 the similarity metric for the feature. None to disable
282 feature incorporation. (default None)
283 n : int
284 Count of runs for a stochastic lumping kernel. (default 1)
285 """
286 self.n_runs = n
287 self.kernel = kernel
288 self.feature_kernel = feature_kernel
290 self.Z = np.zeros((self.n_runs, self.n_states - 1, 4), dtype=np.float64)
291 self.full_pop = np.zeros((self.n_runs, 2 * self.n_states - 1), dtype=np.uint32)
292 if self.quiet: 292 ↛ 295line 292 didn't jump to line 295 because the condition on line 292 was always true
293 iter = range(self.n_runs)
294 else:
295 print("Clustering ...")
296 iter = tqdm(range(self.n_runs))
297 for i in iter:
298 self.Z[i], self.full_pop[i] = core.cluster(
299 self.tmat,
300 self.pop,
301 kernel=self.kernel,
302 feature_kernel=self.feature_kernel,
303 )
304 self.assign_macrostates()
306 def assign_macrostates(self) -> None:
307 """Assign macrostates and provide macrostate data."""
308 self.macrostate_feature = []
309 self.macrostate_multi_feature = []
310 self.macrostate_assignment = []
311 self.macrostate_map = []
312 self.macrostate_tmat = []
313 self.macrostate_trajectory = np.zeros(
314 (self.n_runs, self.trajectory.shape[0]), dtype=self.trajectory.dtype.type
315 )
316 self.n_macrostates = []
318 if self.quiet: 318 ↛ 321line 318 didn't jump to line 321 because the condition on line 318 was always true
319 iter = range(self.n_runs)
320 else:
321 print("Assigning macrostates ...")
322 iter = tqdm(range(self.n_runs))
323 for n_i in iter:
324 self.macrostate_assignment.append(
325 utils.get_macrostate_assignment_from_tree(self.tree[n_i])
326 )
328 # Calculate other macrostate related values
329 self.macrostate_map.append(
330 np.zeros(self.n_states, dtype=self.trajectory.dtype.type)
331 )
332 mas, mis = np.where(self.macrostate_assignment[-1] == 1)
333 self.macrostate_map[-1][mis] = mas
334 self.macrostate_tmat.append(
335 utils.macrostate_tmat(
336 self.tmat, self.macrostate_assignment[-1], self.pop
337 )
338 )
339 self.macrostate_trajectory[n_i] = utils.translate_trajectory(
340 self.trajectory, self.macrostate_map[-1]
341 )
342 self.n_macrostates.append(self.macrostate_assignment[-1].shape[0])
343 self.macrostate_feature.append(
344 [
345 self.mean_feature_trajectory[
346 np.where(self.macrostate_trajectory[n_i] == i)
347 ].mean()
348 for i in np.arange(self.n_macrostates[-1])
349 ]
350 )
351 self.macrostate_multi_feature.append(
352 [
353 self.multi_feature_trajectory_bool[
354 np.where(self.macrostate_trajectory[n_i] == i)
355 ].mean(axis=0)
356 for i in np.arange(self.n_macrostates[-1], dtype=int)
357 ]
358 )
360 def gpcca(self, n_macrostates: int | None = None) -> None:
361 """Instead of MPP algorithm use GPCCA algorithm for lumping.
363 Generalized Perron Cluster Cluster Analysis (GPCCA) is used
364 instead of the MPP algorithm. GPCCA requires the definition of
365 a number of macrostates to cluster the microstates into. For
366 comparison, the number of macrostates in the reference lumping
367 may be a good choice. This method is implemented for reference.
369 Parameters
370 ----------
371 n_macrostates : int
372 Number of macrostates. If None, employ the same number of
373 macrostates as yielded from the reference lumping.
374 (default None)
375 """
376 if n_macrostates is None: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true
377 n_macrostates = self.reference.n_macrostates[0]
378 self.gpcca = gp.GPCCA(self.tmat, method="krylov")
379 self.gpcca.optimize(n_macrostates)
381 self.n_runs = 1
382 self.n_macrostates = [n_macrostates]
384 gma = self.gpcca.macrostate_assignment
385 gmt = np.zeros(self.trajectory.shape, dtype=self.trajectory.dtype)
386 gmf = np.empty(self.n_macrostates[0])
387 for i in range(self.n_macrostates[0]):
388 gmt[np.where(np.isin(self.trajectory, np.where(gma == i)[0]))[0]] = i + 1
389 gmf[i] = self.mean_feature_trajectory[gmt == i + 1].mean()
391 order = np.argsort(gmf)[::-1]
392 new_states = np.empty(self.n_macrostates[0], dtype=self.trajectory.dtype.type)
393 new_states[order] = np.arange(
394 self.n_macrostates[0], dtype=self.trajectory.dtype.type
395 )
396 self.macrostate_map = [np.empty(gma.shape, dtype=self.trajectory.dtype.type)]
397 for i in range(self.n_macrostates[0]):
398 self.macrostate_map[0][np.where(gma == i)] = new_states[i]
400 self.macrostate_assignment = [
401 np.full((self.n_macrostates[0], self.macrostate_map[0].shape[0]), False)
402 ]
403 self.macrostate_assignment[0][
404 self.macrostate_map[0],
405 np.arange(self.macrostate_map[0].shape[0], dtype=int),
406 ] = True
407 self.macrostate_feature = [gmf[order]]
408 self.macrostate_trajectory = np.empty(
409 (self.n_runs, self.trajectory.shape[0]), dtype=self.trajectory.dtype.type
410 )
411 self.macrostate_trajectory[0] = utils.translate_trajectory(
412 self.trajectory, self.macrostate_map[0]
413 )
414 self.macrostate_tmat = [
415 utils.macrostate_tmat(self.tmat, self.macrostate_assignment[0], self.pop)
416 ]
418 # Create mock Z and mock full_pop for Sankey plot
419 # After implementation remove mock Z.npy file in run.py
420 self.Z = np.zeros((self.n_runs, self.n_states - 1, 4), dtype=np.float64)
421 self.full_pop = np.zeros((self.n_runs, 2 * self.n_states - 1), dtype=np.uint32)
422 self.full_pop[0, : self.n_states] = self.pop
424 last_merged = self.n_states
425 merge = 0
426 for macrostate in range(self.n_macrostates[0]):
427 microstates = np.where(self.macrostate_map[0] == macrostate)[0]
428 origin = microstates[0]
429 if microstates.shape[0] > 1:
430 for target in microstates[1:]:
431 intermediate_state = self.n_states + merge
432 self.full_pop[0, intermediate_state] = self.full_pop[
433 0, [origin, target]
434 ].sum()
435 self.Z[0, merge] = (
436 origin,
437 target,
438 0.2,
439 self.full_pop[0, intermediate_state],
440 )
441 origin = intermediate_state
442 merge += 1
444 if macrostate > 0:
445 intermediate_state = self.n_states + merge
446 target = last_merged
447 self.full_pop[0, intermediate_state] = self.full_pop[
448 0, [origin, target]
449 ].sum()
450 self.Z[0, merge] = (
451 origin,
452 target,
453 0.9,
454 self.full_pop[0, intermediate_state],
455 )
456 last_merged = intermediate_state
457 merge += 1
458 else:
459 last_merged = origin
461 self.tree
462 self.pop_thr = 0
463 self.q_min = 0.5
465 def save_macrostate_trajectory(self, out: Path, one_based: bool = False) -> None:
466 """Write macrostate trajectory to a text file."""
467 header = (
468 f"# Created by Lumping class\n"
469 f"# Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
470 f"# Trajectory contains {self.n_macrostates[self.n_i]} states and {self.macrostate_trajectory.shape[1]} frames.\n"
471 f"# Trajectory index: {self.n_i}\n"
472 )
473 macrostate_trajectory = self.macrostate_trajectory[self.n_i]
474 if one_based:
475 macrostate_trajectory += 1
476 np.savetxt(out, macrostate_trajectory, fmt="%.0f", header=header)
478 def save_Z(self, out: Path, n_i: int | Iterable | Literal["all"] = "all") -> None:
479 """Save Z matrix.
481 Parameters
482 ----------
483 out : Path
484 Where to save the Z matrix.
485 n_i : int or iterable of int or {"all"}
486 Which lumpings to save.
487 """
488 if not out.endswith(".npy"): 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true
489 out += ".npy"
491 if n_i == "all": 491 ↛ 493line 491 didn't jump to line 493 because the condition on line 491 was always true
492 np.save(out, self.Z)
493 elif isinstance(n_i, Iterable):
494 np.save(out, self.Z[n_i])
495 elif isinstance(n_i, int):
496 np.save(out, self.Z[n_i : n_i + 1])
497 else:
498 raise ValueError("n_i must be 'all', Iterable or int.")
500 def load_Z(self, Z):
501 """Load Z matrix."""
502 if isinstance(Z, np.ndarray): 502 ↛ 503line 502 didn't jump to line 503 because the condition on line 502 was never true
503 self.Z = Z
504 elif os.path.exists(Z): 504 ↛ 507line 504 didn't jump to line 507 because the condition on line 504 was always true
505 self.Z = np.load(Z)
506 else:
507 raise ValueError("Z must be a numpy array or a .npy file.")
509 self.n_runs = self.Z.shape[0]
510 # n: number of macrostates
511 tmat, states = mh.msm.estimate_markov_model(
512 utils.get_multi_state_trajectory(self.trajectory, self.limits),
513 self.lagtime,
514 )
515 self.tmat = tmat.astype(float)
516 _, self.pop = np.unique(self.trajectory, return_counts=True)
517 self.n_states = len(states)
518 self.full_pop = np.zeros((self.n_runs, 2 * self.n_states - 1), dtype=np.uint32)
519 self.full_pop[:, : self.n_states] = self.pop
520 self.full_pop[:, self.n_states :] = self.Z[:, :, 3]
522 self.assign_macrostates()
524 def save_rmsd(self, out):
525 """Save RMSD of states to numpy file."""
526 np.save(out, self.rmsd)
528 def load_rmsd(self, f_name):
529 """Save RMSD of states from numpy file."""
530 self._rmsd = np.load(f_name)
532 def draw_random_frames_indices(
533 self, out: Path | None = None, n: int = 20
534 ) -> npt.NDArray[np.int_] | None:
535 """Draw n random frame indices for each macrostate.
537 Parameters
538 ----------
539 out : Path, optional
540 Directory where to save the .ndx files. If None, return
541 indices instead of being saved. (default None)
542 n : int
543 Number of random frame indices to draw. (default 20)
545 Returns
546 -------
547 ndarray of int, shape (N,) or None
548 N random frame indices if out is None, otherwise retutrns
549 None.
550 """
551 drawn_frames = np.empty((self.n_macrostates[self.n_i], n), dtype=int)
552 for state in np.arange(self.n_macrostates[self.n_i]):
553 frames_in_state = np.where(self.macrostate_trajectory[self.n_i] == state)[0]
554 drawn_frames[state] = np.random.choice(
555 frames_in_state, size=n, replace=False
556 )
557 if self.xtc_stride is not None:
558 drawn_frames *= self.xtc_stride
559 if out is not None:
560 # Path(os.path.join(out)).mkdir(parents=True, exist_ok=True)
561 out.mkdir(parents=True, exist_ok=True)
562 for s, i in enumerate(drawn_frames):
563 np.savetxt(
564 os.path.join(out, f"{s + 1:02d}.ndx"),
565 i,
566 fmt="%.0f",
567 header="[frames]",
568 )
569 else:
570 return drawn_frames
572 def draw_random_frames(self, out: Path, n: int = 20) -> None:
573 """Draw n random frames per macrostate and write pdb files.
575 Parameters
576 ----------
577 out : Path
578 Directory where to save the .pdb files.
579 n : int
580 Number of random frame indices to draw. (default 20)
581 """
582 for state in np.arange(self.n_macrostates[self.n_i]):
583 frames_in_state = np.where(self.macrostate_trajectory[self.n_i] == state)[0]
584 drawn_frames = np.random.choice(frames_in_state, size=n, replace=False)
585 for i, frame in enumerate(drawn_frames):
586 f = md.load_xtc(
587 self.xtc_trajectory_file,
588 top=self.topology_file,
589 frame=frame,
590 )
591 f.save_pdb(os.path.join(out, f"S{state}_{i:02d}.pdb"))
593 def get_best_defined_contacts(self, n: int = 3) -> npt.NDArray[np.int_]:
594 """Return the feature indices with the least variance.
596 Calculate the variance for each feature for each macrostete for
597 lumping n_i and return the indices for features with the least
598 variance.
600 Parameters
601 ----------
602 n : int
603 Number of features to return. (default 3)
605 Returns
606 -------
607 ndarray of int, shape (N, M)
608 Indices of least varying features. N is the number of
609 macrostates and M is the number of features required
610 (parameter n).
611 """
612 contacts = np.zeros((self.n_macrostates[self.n_i], n), dtype=int)
613 for i in range(self.n_macrostates[self.n_i]):
614 contacts[i] = np.argsort(
615 np.var(
616 self.multi_feature_trajectory[
617 self.macrostate_trajectory[self.n_i] == i
618 ],
619 axis=0,
620 )
621 )[:n]
622 return contacts
624 def get_least_moving_residues(
625 self, contact_index_file: Path, n: int = 3
626 ) -> list[npt.NDArray[np.int_]]:
627 """Return residue indices of least varying feature.
629 Parameters
630 ----------
631 contact_index_file : Path
632 Path to contact index file. Each line contains the two
633 residue indices space separated, which are part of the
634 feature. The line index corresponds to the feature index.
635 n : int
636 Number of features to consider. (default 3)
638 Returns
639 -------
640 list of ndarray of int
641 One list entry per macrostate. The array contains the
642 indices of the features with least variance for the
643 respective macrostate.
644 """
645 contact_indices = np.loadtxt(contact_index_file, dtype=int)
646 contacts = self.get_best_defined_contacts(n)
647 least_moving_residues = []
648 for c in contacts:
649 least_moving_residues.append(np.unique(contact_indices[c].flatten()))
650 return least_moving_residues
652 def write_least_moving_residues(
653 self, contact_index_file: Path, out: Path, n: int = 3
654 ) -> None:
655 """Write indices of least moving residues to an index file.
657 Parameters
658 ----------
659 contact_index_file : Path
660 Path to contact index file. Each line contains the two
661 residue indices space separated, which are part of the
662 feature. The line index corresponds to the feature index.
663 out : Path
664 Target file for the residue indices. Each line holds the
665 residue indices for one macrostate in a space separated
666 list
667 n : int
668 Number of features to consider. (default 3)
669 """
670 if contact_index_file != "none": 670 ↛ 678line 670 didn't jump to line 678 because the condition on line 670 was always true
671 least_moving_residues = self.get_least_moving_residues(
672 contact_index_file, n=n
673 )
674 with open(out, "w") as f:
675 for residues in least_moving_residues:
676 f.write(f"{' '.join(residues.astype(str))}\n")
677 else:
678 with open(out, "w") as f:
679 f.write("")
681 def __add__(
682 self, other: "Lumping"
683 ) -> tuple["Lumping", "Lumping", npt.NDArray[np.floating]]:
684 """Return the state overlap between two lumpings.
686 This is mostly utilized to analyze the similarity of stochastic
687 lumpings to a deterministic one.
689 Parameters
690 ----------
691 other : Lumping
692 Another instance of Lumping with which to compare this
693 Lumping instance.
695 Returns
696 -------
697 tuple of (
698 Lumping,
699 Lumping,
700 (ndarray of float, shape (3, N, M)),
701 )
702 The first Lumping object is the deterministic lumping to
703 which the stochastic lumping (second Lumping instance) is
704 compared. The array contains the results: First comes the
705 union, second the fraction in reference macrostates and
706 third the fraction in stochastic lumpings. N is the number
707 of macrostates of the reference lumping and M the number of
708 stochastic lumpings. The reference is not forcibly
709 Lumping.reference, but can be any deterministic Lumping
710 instance.
711 """
712 if self.n_runs == 1 and other.n_runs >= 1: 712 ↛ 714line 712 didn't jump to line 714 because the condition on line 712 was never true
713 # reference
714 ref = self
715 # stochastic lumping
716 sto = other
717 elif other.n_runs == 1 and self.n_runs >= 1: 717 ↛ 721line 717 didn't jump to line 721 because the condition on line 717 was always true
718 ref = other
719 sto = self
720 else:
721 raise ValueError("The reference lumping must have exactly one run.")
722 return ref, sto, utils.similarity(ref, sto)
724 @property
725 def reference(self) -> "Lumping":
726 """The reference lupming of the system.
728 Only the transition probability is utilized to estimate the
729 state similarity in the lumping process.
731 Returns
732 -------
733 Lumping
734 An instance of Lumping with reference parameters.
735 """
736 if self._reference is None: 736 ↛ 749line 736 didn't jump to line 749 because the condition on line 736 was always true
737 k = kernel_module.LumpingKernel()
738 self._reference = Lumping(
739 self.trajectory,
740 self.lagtime,
741 self.multi_feature_trajectory,
742 contact_threshold=self.contact_threshold,
743 pop_thr=self.pop_thr,
744 q_min=self.q_min,
745 limits=self.limits,
746 quiet=True,
747 )
748 self._reference.run_mpp(k)
749 return self._reference
751 @property
752 def n_i(self) -> int:
753 """Index of the lumping under consideration.
755 0 for deterministic lumpings. Is set to the lumping with the
756 longest first implied timescale, if not set manually.
758 Returns
759 -------
760 int
761 Index of the lumping under consideration.
762 """
763 if self._n_i is None:
764 if self.n_runs > 1: 764 ↛ 765line 764 didn't jump to line 765 because the condition on line 764 was never true
765 self._n_i = np.argmax(self.timescales[:, 0])
766 else:
767 self._n_i = 0
768 return self._n_i
770 @n_i.setter
771 def n_i(self, value: int) -> None:
772 if not isinstance(value, int):
773 raise ValueError("n_i must be an integer")
774 self._n_i = value
776 @property
777 def timescales(self) -> npt.NDArray[np.floating]:
778 """Implied timescales property.
780 If more than the first 3 implied timescales are required, run
781 the calc_timescales method separately.
783 Returns
784 -------
785 ndarray of float, shape (N, M)
786 First M implied timescales for all N runs.
787 """
788 if self._timescales is None: 788 ↛ 790line 788 didn't jump to line 790 because the condition on line 788 was always true
789 self.calc_timescales()
790 return self._timescales
792 def calc_timescales(self, ntimescales: int = 3) -> None:
793 """Calculate implied timescales.
795 Parameters
796 ----------
797 ntimescales : int
798 Number of first implied timescales to calculate.
799 """
800 self._timescales = np.zeros((self.n_runs, ntimescales))
801 for i, trajectory in enumerate(self.macrostate_trajectory):
802 self._timescales[i, :] = mh.msm.implied_timescales(
803 utils.get_multi_state_trajectory(trajectory, self.limits),
804 [self.lagtime],
805 ntimescales=ntimescales,
806 )[0]
808 @property
809 def linkage(self) -> npt.NDArray[np.floating]:
810 """The linkage matrix.
812 The linkage matrix is derived from the Z matrix and utilized in
813 former implementations of the MPP algorithm.
815 Returns
816 -------
817 ndarray of float, shape (N-1, 3)
818 For a system of N microstates, each line defines a merging.
819 The state in the first column is merged with the state in
820 the second column and the new state has the index of the
821 second state, which is the more stable one. The
822 metastability of the first state is stored in the third
823 column.
824 """
825 if self._linkage is None:
826 self._linkage = utils.Z_to_linkage(self.Z[self.n_i])
827 return self._linkage
829 @property
830 def macrostate_population(self) -> list[npt.NDArray[np.int_]]:
831 """Return the macrostate populations for all runs."""
832 if self._macrostate_population is None:
833 self._macrostate_population = []
834 for j, ma in enumerate(self.macrostate_assignment):
835 self._macrostate_population.append(
836 np.zeros(ma.shape[0], dtype=self.full_pop.dtype.type)
837 )
838 for i, m in enumerate(ma):
839 self._macrostate_population[-1][i] = self.full_pop[
840 j, : self.n_states
841 ][m.astype(bool)].sum()
842 return self._macrostate_population
844 @property
845 def rmsd(self) -> npt.NDArray[np.floating]:
846 """Return the root mean square deviation (RMSD) of C-alphas.
848 The RMSD of C-alphas is calculate so that it is minimal. The
849 mean_frames attribute holds the mdtraj objects of the frames,
850 which minimizes the RMSD, for each macrostate.
852 Returns
853 -------
854 ndarray of float, shape (N, M)
855 C-alpha RMSD values for N macrostates and M C-alpha atoms.
856 """
857 if self._rmsd is None:
858 self._rmsd, self.mean_frames = utils.calc_rmsd(self, quiet=self.quiet)
859 return self._rmsd
861 def rmsd_sharpness(self) -> float:
862 """Returns the RMSD sharpness of a lumping.
864 The RMSD sharpness is given by the population weighted mean of
865 mean RMSDs:
866 s = sum_j(mean_i(rmsd_ij) * p_j) / sum_j(p_j)
867 for C-alpha i, macrostate j and population p.
869 Returns
870 -------
871 float
872 RMSD sharpness value
874 See Also
875 --------
876 Lumping.rmsd : RMSD property.
877 """
878 return (
879 self.rmsd.mean(axis=1) * self.macrostate_population[self.n_i]
880 ).sum() / self.macrostate_population[self.n_i].sum()
882 @property
883 def shannon_entropy(self) -> npt.NDArray[np.floating]:
884 """The Shannon entropy.
886 The Shannon entropy is employed to measure how evenly the
887 macrostate are populated in a lumping. The Shannon entropy
888 penalizes in this case very low puplated macrostates. The lower
889 the entropy, the more even the population partition of the
890 macrostates.
892 Returns
893 -------
894 ndarray of float, shape (Lumping.n_i,)
895 Array contains the Shannon entropy for each run.
897 References
898 ----------
899 [1] Claude C. Shannon, "A Mathematical Theory of Communication",
900 The Bell System Technical Journal, Volume 27, Issue 3, 1948,
901 DOI: 10.1002/j.1538-7305.1948.tb01338.x
902 """
903 if self._shannon_entropy is None: 903 ↛ 907line 903 didn't jump to line 907 because the condition on line 903 was always true
904 self._shannon_entropy = np.zeros(self.n_runs)
905 for i, pop in enumerate(self.macrostate_population):
906 self._shannon_entropy[i] = utils.shannon_entropy(pop)
907 return self._shannon_entropy
909 @property
910 def davies_bouldin_index(self) -> npt.NDArray[np.floating]:
911 """The davies bouldin index.
913 Returns
914 -------
915 ndarray of float, shape (Lumping.n_i,)
916 Array contains the Davies Bouldin index for each run.
918 References
919 ----------
920 [1] David L. Davies; Donald W. Bouldin, "A Cluster Separation
921 Measure", IEEE Transactions on Pattern Analysis and Machine
922 Intelligence, Volume PAMI-1, Issue 2, pp. 224-227, 1979,
923 DOI: 10.1109/TPAMI.1979.4766909
924 """
925 if self._davies_bouldin_index is None: 925 ↛ 931line 925 didn't jump to line 931 because the condition on line 925 was always true
926 self._davies_bouldin_index = np.zeros(self.n_runs)
927 for i in range(self.n_runs):
928 self._davies_bouldin_index[i] = davies_bouldin_score(
929 self.multi_feature_trajectory, self.macrostate_trajectory[i]
930 )
931 return self._davies_bouldin_index
933 @property
934 def gmrq(self) -> npt.NDArray[np.floating]:
935 """The generalized matrix rayleigh quotient (GMRQ).
937 Returns
938 -------
939 ndarray of float, shape (Lumping.n_i,)
940 Array contains the GMRQ for each run.
942 References
943 ----------
944 [1] Robert T. McGibbon; Vijay S. Pande, "Variational
945 cross-validation of slow dynamical modes in molecular
946 kinetics", The Journal of Chemical Physics, Volume 142,
947 Issue 12, 2015, DOI: 10.1063/1.4916292
948 """
949 if self._gmrq is None: 949 ↛ 951line 949 didn't jump to line 951 because the condition on line 949 was always true
950 self._gmrq = utils.gmrq(self.macrostate_tmat)
951 return self._gmrq
953 @property
954 def macro_micro_feature(self) -> npt.NDArray[np.floating]:
955 """Assign macrostate feature values to corresponding microstates.
957 This is useful for the analysis of stochastic lumpings.
959 Returns
960 -------
961 ndarray of float, shape (n_states, n_runs)
962 Contains macrostate feature values for each microstate.
964 See Also
965 --------
966 MPP.plot.macro_feature
967 """
968 if self._macro_micro_feature is None: 968 ↛ 978line 968 didn't jump to line 978 because the condition on line 968 was always true
969 self._macro_micro_feature = np.zeros(
970 (self.n_states, self.n_runs),
971 dtype=self.mean_feature_trajectory.dtype.type,
972 )
973 for i, (ma, mf) in enumerate(
974 zip(self.macrostate_assignment, self.macrostate_feature)
975 ):
976 for j, mb in enumerate(ma.astype(bool)):
977 self._macro_micro_feature[mb, i] = mf[j]
978 return self._macro_micro_feature
980 @property
981 def tree(self) -> list[core.BinaryTreeNode]:
982 """The lumping tree.
984 This property holds the lumping trees of all lumpings performed
985 by this object. The root node is stored for each lumping.
988 """
989 if self._tree is None:
990 self._tree = []
991 for z, pop in zip(self.Z, self.full_pop):
992 self._tree.append(self.build_tree(z, pop))
993 return self._tree
995 def build_tree(self, Z, full_pop):
996 """Build tree using BinaryTreeNode and return root"""
997 n = Z.shape[0] + 1
998 nodes = {}
999 for i, (state, target_state, q, pop) in enumerate(Z):
1000 state = int(state)
1001 target_state = int(target_state)
1002 if state not in nodes:
1003 nodes[state] = core.BinaryTreeNode(
1004 state,
1005 self.tmat,
1006 population=full_pop[state],
1007 q=q,
1008 pop_thr=self.pop_thr,
1009 q_min=self.q_min,
1010 )
1011 if target_state not in nodes:
1012 nodes[target_state] = core.BinaryTreeNode(
1013 target_state,
1014 self.tmat,
1015 population=full_pop[target_state],
1016 q=q,
1017 pop_thr=self.pop_thr,
1018 q_min=self.q_min,
1019 )
1020 nodes[n + i] = core.BinaryTreeNode(
1021 n + i,
1022 self.tmat,
1023 q=q,
1024 pop_thr=self.pop_thr,
1025 q_min=self.q_min,
1026 )
1027 nodes[n + i].left = nodes[state]
1028 nodes[n + i].right = nodes[target_state]
1029 for node in nodes[n + i].leaves:
1030 node.feature = self.mean_feature[node.name]
1031 return nodes[n + i]
1033 @property
1034 def trajectory(self) -> npt.NDArray[np.int_]:
1035 """The microstate trajectory - 0-based."""
1036 return self._trajectory
1038 @trajectory.setter
1039 def trajectory(self, value):
1040 if value.min() == 1:
1041 value -= 1
1042 warnings.warn("1-based trajectory was shifted to 0-based.")
1043 if np.unique(value).shape[0] > value.max() + 1: 1043 ↛ 1044line 1043 didn't jump to line 1044 because the condition on line 1043 was never true
1044 raise ValueError("The state numbering in the trajectory is not continuous")
1045 if value.max() < 2**7: 1045 ↛ 1046line 1045 didn't jump to line 1046 because the condition on line 1045 was never true
1046 trajectory_type = np.uint8
1047 elif value.max() < 2**15: 1047 ↛ 1050line 1047 didn't jump to line 1050 because the condition on line 1047 was always true
1048 trajectory_type = np.uint16
1049 else:
1050 trajectory_type = np.uint32
1051 self._trajectory = value.astype(trajectory_type)
1053 @property
1054 def topology_file(self):
1055 """The topology_file property."""
1056 if self._topology_file is None: 1056 ↛ 1057line 1056 didn't jump to line 1057 because the condition on line 1056 was never true
1057 raise ValueError("No topology file set.")
1058 return self._topology_file
1060 @topology_file.setter
1061 def topology_file(self, value):
1062 if os.path.isfile(value): 1062 ↛ 1065line 1062 didn't jump to line 1065 because the condition on line 1062 was always true
1063 self._topology_file = value
1064 else:
1065 raise FileNotFoundError(f"No such file: {value}")
1067 @property
1068 def xtc_trajectory_file(self):
1069 """The xtc_trajectory_file property."""
1070 if self._xtc_trajectory_file is None: 1070 ↛ 1071line 1070 didn't jump to line 1071 because the condition on line 1070 was never true
1071 raise ValueError("No xtc trajectory file set.")
1072 return self._xtc_trajectory_file
1074 @xtc_trajectory_file.setter
1075 def xtc_trajectory_file(self, value):
1076 if os.path.isfile(value): 1076 ↛ 1079line 1076 didn't jump to line 1079 because the condition on line 1076 was always true
1077 self._xtc_trajectory_file = value
1078 else:
1079 raise FileNotFoundError(f"No such file: {value}")
1081 @property
1082 def frame_length(self) -> float:
1083 """The frame length in ns."""
1084 return self._frame_length
1086 @frame_length.setter
1087 def frame_length(self, value):
1088 self._frame_length = value
1091class Plotter:
1092 def __init__(self, obj):
1093 self._obj = obj
1095 def dendrogram(self, out: str, scale=1, offset=0):
1096 """Plot dendrogram"""
1097 plot.plot_tree(
1098 self._obj.tree[self._obj.n_i],
1099 self._obj.macrostate_assignment[self._obj.n_i],
1100 out,
1101 scale=scale,
1102 offset=offset,
1103 )
1105 def implied_timescales(self, out, use_ref=True, scale=1):
1106 """
1107 out: File to write plot
1108 use_ref: If it for reference trajectory should be plotted
1109 scale: scaling factor for plot
1110 """
1111 if use_ref: 1111 ↛ 1112line 1111 didn't jump to line 1112 because the condition on line 1111 was never true
1112 ref_trajectory = self._obj.reference.macrostate_trajectory[0]
1113 else:
1114 ref_trajectory = self._obj.trajectory
1116 macrostate_trajectory = utils.get_multi_state_trajectory(
1117 self._obj.macrostate_trajectory[self._obj.n_i], self._obj.limits
1118 )
1120 dlagtime = max(1, int(1 / self._obj.frame_length))
1121 plot.implied_timescales(
1122 [ref_trajectory, macrostate_trajectory],
1123 np.arange(1, 4.5 * self._obj.lagtime + dlagtime, dlagtime, dtype=int),
1124 out,
1125 frame_length=self._obj.frame_length,
1126 first_ref=True,
1127 scale=scale,
1128 use_ref=use_ref,
1129 ntimescales=self._obj.timescales.shape[1],
1130 )
1132 def macro_feature(self, out, ref=None):
1133 """
1134 Plot histogram of feature distribution.
1136 micro_feature (np.ndarray, NxR): N microstates, R runs, holds feature
1137 values of respective macrostate
1138 out (str): file to save the plot
1139 ref (list[tuple]): list of
1140 - macrostate_assignment
1141 - macrostate_feature
1142 - color
1143 - label
1144 of the clusterings that should be shown explicitly.
1145 """
1146 plot.macro_feature(
1147 self._obj.macro_micro_feature,
1148 out,
1149 self._obj.reference if ref is None else ref,
1150 )
1152 def rmsd(self, out, helices=None):
1153 plot.rmsd(
1154 self._obj.rmsd, self._obj.macrostate_population[self._obj.n_i], helices, out
1155 )
1157 def delta_rmsd(self, out, helices=None):
1158 plot.delta_rmsd(
1159 self._obj.rmsd, self._obj.macrostate_population[self._obj.n_i], helices, out
1160 )
1162 def contact_rep(self, cluster_file, out, scale=1):
1163 plot.contact_rep(
1164 self._obj.multi_feature_trajectory,
1165 cluster_file,
1166 self._obj.macrostate_trajectory[self._obj.n_i],
1167 out,
1168 utils.get_grid_format(self._obj.n_macrostates[self._obj.n_i]),
1169 scale=scale,
1170 )
1172 def relative_implied_timescales(self, out):
1173 plot.relative_implied_timescales(self._obj, out)
1175 def ck_test(self, out):
1176 plot.chapman_kolmogorov(self._obj, out, self._obj.frame_length)
1178 def state_network(self, out):
1179 plot.state_network(self._obj, out)
1181 def stochastic_state_similarity(self, out):
1182 plot.stochastic_state_similarity(self._obj, self._obj.reference, out)
1184 def transition_matrix(self, out):
1185 plot.transition_matrix(self._obj.macrostate_tmat[self._obj.n_i], out)
1187 def transition_time(self, out):
1188 plot.transition_time(
1189 self._obj.macrostate_tmat[self._obj.n_i],
1190 out,
1191 lagtime=self._obj.lagtime,
1192 frame_length=self._obj.frame_length,
1193 )
1195 def graph(self, out, u=0, f=0):
1196 draw_knetwork(
1197 self._obj.macrostate_trajectory[self._obj.n_i],
1198 self._obj.lagtime,
1199 self._obj.mean_feature_trajectory,
1200 out,
1201 u=u,
1202 f=f,
1203 )
1205 def sankey(self, out, ax=None, scale=1):
1206 plot.sankey_diagram(self._obj, self._obj.reference, out, ax=ax, scale=scale)
1208 def macrostate_trajectory(self, out, row_length=0.2):
1209 plot.state_trajectory(
1210 self._obj.macrostate_trajectory[self._obj.n_i],
1211 out,
1212 row_length=row_length,
1213 frame_length=self._obj.frame_length,
1214 )