Coverage for MPP/MPP.py: 86%

436 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-22 12:22 +0200

1import os 

2import datetime 

3import warnings 

4import numpy as np 

5import numpy.typing as npt 

6import msmhelper as mh 

7import mdtraj as md 

8 

9from typing import Literal 

10from pathlib import Path 

11from tqdm import tqdm 

12from collections.abc import Iterable 

13from sklearn.metrics import davies_bouldin_score 

14import pygpcca as gp 

15 

16from . import core 

17from . import utils 

18from . import kernel as kernel_module 

19from . import plot 

20from .graph import draw_knetwork 

21 

22 

23class Lumping(object): 

24 """A class to hold all the information and methods for MPP. 

25 

26 The most probable path (MPP) algorithm reduces the number of states 

27 in a microstate trajectory of a Markovian process in order to 

28 simplify its analysis. This two-step algorithm first builds a 

29 lumping tree by successively merging the least metastable state 

30 together with its most similar other state. In a second step, the 

31 lumping tree is parsed in reverse order (from the root) in order to 

32 partition the microstates into macrostates, which satisfy a minimum 

33 metastability and a minimum population criterion. For details, see 

34 the reference publication (see below). 

35 

36 Attributes 

37 ---------- 

38 trajectory : ndarray of int, shape (N,) 

39 The microstate trajectory. N is the number of frames. 

40 lagtime : int 

41 Lagtime used, in frames. 

42 frame_length : float 

43 Length of a frame in ns. (default 0.2) 

44 pop_thr : float 

45 Population threshold for macrostates. (default 0.005) 

46 q_min : float 

47 Minimum metastability of macrostates. (default 0.5) 

48 limits : list of int 

49 If the trajectory is composed of several independent 

50 simulations, this list contains the lengths of the individual 

51 trajectories. (default None) 

52 tmat : ndarray of float, shape (n_states, n_states) 

53 Transition matrix of the microstate Markov state model. 

54 pop : ndarray of int, shape (N,) 

55 Population of the microstates in the trajectory. N is the 

56 number of microstates. 

57 n_states : int 

58 Number of microstates. 

59 multi_feature_trajectory : ndarray of float, shape (N, M) 

60 A feature trajectory of M features and N frames. For example 

61 selected contact distances are passed here. Must be the same as 

62 for trajectory. 

63 mean_feature_trajectory : ndarray of float, shape (N,) 

64 The mean feature trajectory of N frames. 

65 mean_feature : ndarray of float, shape (N,) 

66 The mean feature for each of N microstates. 

67 Z : ndarray of float, shape (N-1, 4) 

68 The Z matrix defines how the microstates are lumped together. 

69 reference : Lumping 

70 Instance of the Lumping object with reference parameters, that 

71 are, the only the transition probability is utilized as 

72 similarity metric. 

73 macrostate_assignment : list of (ndarray of bool, 

74 shape (Lumping.n_macrostates, Lumping.n_states)) 

75 Macrostate assignment of the lumpings. Elements, whose indices 

76 correspond to an assignment are True, all other elements are 

77 False. 

78 macrostate_map : list of (ndarray of int, shape (N,)) 

79 Map arrays of the assignment. The index of the array 

80 corresponds to the microstate and the value is the index of the 

81 assigned macrostate. 

82 macrostate_tmat : list of (ndarray of float, shape (N, N)) 

83 Transition matrices of the lumpings. 

84 macrostate_trajectory : ndarray of int, shape (N, M) 

85 Macrostate trajectories. N is the number of runs and M the 

86 langhth of the trajectories. 

87 n_macrostates : list of int 

88 Number of macrostates in each lumping. 

89 

90 Citation 

91 -------- 

92 tbd 

93 """ 

94 

95 lagtime: int 

96 """The lagtime used, in frames.""" 

97 

98 contact_threshold: float 

99 """Distance below which a feature (e.g. a contact) is considered 

100 formed. 

101 """ 

102 

103 pop_thr: float 

104 """Minimum population allowed for macrostates.""" 

105 

106 q_min: float 

107 """Minimum metastability allowed for macrostates.""" 

108 

109 limits: list[int] 

110 """When concatenated trajectories are used, the lengths of the 

111 individual trajectories. 

112 """ 

113 

114 n_states: int 

115 """The number of microstates.""" 

116 

117 quiet: bool 

118 """Suppress progress reports.""" 

119 

120 Z: npt.NDArray[float] 

121 """The Z matrix defines how the microstates are lumped together. 

122 

123 A 2D array of float, shape (Lumping.n_states-1, 4). The Z matrix 

124 defines a lumping and is organized like the Z matrix returned from 

125 scipy.cluster.hierarchy.linkage. Each row describes a merging of 

126 two states in the first step of the MPP algorithm. The first two 

127 columns contain the state indices, which are merged. The third 

128 column holds the metastability of the less stable state (the state 

129 in the first column) and the last column provides the population of 

130 the new state, which has index Lumping.n_states + i, where i is the 

131 row index. 

132 """ 

133 

134 xtc_stride: float 

135 """Use only every Lumping.xtc_stride frame of the xtc trajectory.""" 

136 

137 plot: "Plotter" 

138 """Produce various plots of the lumping.""" 

139 

140 macrostate_feature: list[list[float]] 

141 """Mean macrostate feature.""" 

142 

143 macrostate_multi_feature: list[list[npt.NDArray[np.floating]]] 

144 """Macrostate features.""" 

145 

146 macrostate_assignment: list[npt.NDArray[bool]] 

147 """Macrostate assignments.""" 

148 

149 macrostate_map: list[npt.NDArray] 

150 # macrostate_tmat = [] 

151 # macrostate_trajectory = np.zeros( 

152 # n_macrostates = [] 

153 

154 def __init__( 

155 self, 

156 trajectory: npt.NDArray[np.int_], 

157 lagtime: int, 

158 feature_trajectory: npt.NDArray[np.floating] | None = None, 

159 contact_threshold: float = 0.45, 

160 pop_thr: float = 0.005, 

161 q_min: float = 0.5, 

162 frame_length: float = 0.2, 

163 limits: list[int] | None = None, 

164 quiet: bool = False, 

165 ) -> None: 

166 """Initialize a Lumping object. 

167 

168 Parameters 

169 ---------- 

170 trajectory : ndarray of int, shape (N,) 

171 The microstate trajectory. N is the number of frames. 

172 lagtime : int 

173 Lag time used. 

174 feature_trajectory : ndarray of float, shape (N, M) 

175 A feature trajectory of M features and N frames. For 

176 example selected contact distances are passed here. Must 

177 be the same as for trajectory. If None, a moch array of 

178 ones is created. (default None) 

179 contact_threshold : float 

180 Distance in feature space below which the interaction is 

181 considered positive, e.g. a contact distance below which a 

182 contact is considered formed. (default 0.45) 

183 pop_thr : float 

184 Population threshold for macrostates. (default 0.005) 

185 q_min : float 

186 Minimum metastability of macrostates. (default 0.5) 

187 frame_length : float 

188 Length of a frame in ns. (default 0.2) 

189 limits : list of int 

190 If the trajectory is composed of several independent 

191 simulations, this list contains the lengths of the 

192 individual trajectories. (default None) 

193 quiet : bool 

194 Whether to hide progress reports or not. (default False) 

195 """ 

196 self.trajectory = trajectory 

197 self.frame_length = frame_length 

198 self.lagtime = lagtime 

199 self.pop_thr = pop_thr 

200 self.q_min = q_min 

201 self.limits = limits 

202 self.tmat, states = mh.msm.estimate_markov_model( 

203 utils.get_multi_state_trajectory(self.trajectory, self.limits), 

204 self.lagtime, 

205 ) 

206 _, self.pop = np.unique(self.trajectory, return_counts=True) 

207 self.n_states = len(states) 

208 self.quiet = quiet 

209 if feature_trajectory is not None: 209 ↛ 213line 209 didn't jump to line 213 because the condition on line 209 was always true

210 self.contact_threshold = contact_threshold 

211 self._add_feature(feature_trajectory) 

212 else: 

213 self._add_feature(np.ones((trajectory.shape, 1))) 

214 

215 self.Z = None 

216 self._timescales = None 

217 self._linkage = None 

218 self._macrostate_population = None 

219 self._tree = None 

220 self._shannon_entropy = None 

221 self._davies_bouldin_index = None 

222 self._gmrq = None 

223 self._reference = None 

224 self._topology_file = None 

225 self._xtc_trajectory_file = None 

226 self._rmsd = None 

227 self._n_i = None 

228 self._macro_micro_feature = None 

229 self.xtc_stride = None 

230 self.plot = Plotter(self) 

231 

232 def _add_feature( 

233 self, 

234 feature_trajectory: npt.NDArray[np.floating], 

235 ): 

236 """Add feature data to the instance. 

237 

238 feature_trajectory : ndarray of float, shape (N, M) 

239 A feature trajectory of M features and N frames. For 

240 example selected contact distances are passed here. Must 

241 be the same as for trajectory. 

242 """ 

243 if feature_trajectory.shape[0] != self.trajectory.shape[0]: 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true

244 raise ValueError( 

245 ( 

246 "feature_trajectory must have the same length as the microstate " 

247 "trajectory (mpp.trajectory)" 

248 ) 

249 ) 

250 if feature_trajectory.ndim == 2: 250 ↛ 253line 250 didn't jump to line 253 because the condition on line 250 was always true

251 self.multi_feature_trajectory = feature_trajectory 

252 else: 

253 raise ValueError("feature_trajectory must be 2 D") 

254 

255 self.multi_feature_trajectory_bool = ( 

256 self.multi_feature_trajectory < self.contact_threshold 

257 ) 

258 self.mean_feature_trajectory = self.multi_feature_trajectory_bool.mean(axis=1) 

259 self.mean_feature = np.zeros(self.n_states) 

260 for i in range(self.n_states): 

261 self.mean_feature[i] = self.mean_feature_trajectory[ 

262 self.trajectory == i 

263 ].mean() 

264 

265 def run_mpp( 

266 self, 

267 kernel: kernel_module.LumpingKernel = kernel_module.LumpingKernel(), 

268 feature_kernel: kernel_module.FeatureKernel | None = None, 

269 n: int = 1, 

270 ) -> None: 

271 """Run the MPP algorithm. 

272 

273 Parameters 

274 ---------- 

275 kernel : LumpingKernel 

276 An instance of the LumpingKernel object, which determines 

277 the similarity metric employed. 

278 (default kernel_module.LumpingKernel()) 

279 feature_kernel : FeatureKernel 

280 An instance of the FeatureKernel object, which determines 

281 the similarity metric for the feature. None to disable 

282 feature incorporation. (default None) 

283 n : int 

284 Count of runs for a stochastic lumping kernel. (default 1) 

285 """ 

286 self.n_runs = n 

287 self.kernel = kernel 

288 self.feature_kernel = feature_kernel 

289 

290 self.Z = np.zeros((self.n_runs, self.n_states - 1, 4), dtype=np.float64) 

291 self.full_pop = np.zeros((self.n_runs, 2 * self.n_states - 1), dtype=np.uint32) 

292 if self.quiet: 292 ↛ 295line 292 didn't jump to line 295 because the condition on line 292 was always true

293 iter = range(self.n_runs) 

294 else: 

295 print("Clustering ...") 

296 iter = tqdm(range(self.n_runs)) 

297 for i in iter: 

298 self.Z[i], self.full_pop[i] = core.cluster( 

299 self.tmat, 

300 self.pop, 

301 kernel=self.kernel, 

302 feature_kernel=self.feature_kernel, 

303 ) 

304 self.assign_macrostates() 

305 

306 def assign_macrostates(self) -> None: 

307 """Assign macrostates and provide macrostate data.""" 

308 self.macrostate_feature = [] 

309 self.macrostate_multi_feature = [] 

310 self.macrostate_assignment = [] 

311 self.macrostate_map = [] 

312 self.macrostate_tmat = [] 

313 self.macrostate_trajectory = np.zeros( 

314 (self.n_runs, self.trajectory.shape[0]), dtype=self.trajectory.dtype.type 

315 ) 

316 self.n_macrostates = [] 

317 

318 if self.quiet: 318 ↛ 321line 318 didn't jump to line 321 because the condition on line 318 was always true

319 iter = range(self.n_runs) 

320 else: 

321 print("Assigning macrostates ...") 

322 iter = tqdm(range(self.n_runs)) 

323 for n_i in iter: 

324 self.macrostate_assignment.append( 

325 utils.get_macrostate_assignment_from_tree(self.tree[n_i]) 

326 ) 

327 

328 # Calculate other macrostate related values 

329 self.macrostate_map.append( 

330 np.zeros(self.n_states, dtype=self.trajectory.dtype.type) 

331 ) 

332 mas, mis = np.where(self.macrostate_assignment[-1] == 1) 

333 self.macrostate_map[-1][mis] = mas 

334 self.macrostate_tmat.append( 

335 utils.macrostate_tmat( 

336 self.tmat, self.macrostate_assignment[-1], self.pop 

337 ) 

338 ) 

339 self.macrostate_trajectory[n_i] = utils.translate_trajectory( 

340 self.trajectory, self.macrostate_map[-1] 

341 ) 

342 self.n_macrostates.append(self.macrostate_assignment[-1].shape[0]) 

343 self.macrostate_feature.append( 

344 [ 

345 self.mean_feature_trajectory[ 

346 np.where(self.macrostate_trajectory[n_i] == i) 

347 ].mean() 

348 for i in np.arange(self.n_macrostates[-1]) 

349 ] 

350 ) 

351 self.macrostate_multi_feature.append( 

352 [ 

353 self.multi_feature_trajectory_bool[ 

354 np.where(self.macrostate_trajectory[n_i] == i) 

355 ].mean(axis=0) 

356 for i in np.arange(self.n_macrostates[-1], dtype=int) 

357 ] 

358 ) 

359 

360 def gpcca(self, n_macrostates: int | None = None) -> None: 

361 """Instead of MPP algorithm use GPCCA algorithm for lumping. 

362 

363 Generalized Perron Cluster Cluster Analysis (GPCCA) is used 

364 instead of the MPP algorithm. GPCCA requires the definition of 

365 a number of macrostates to cluster the microstates into. For 

366 comparison, the number of macrostates in the reference lumping 

367 may be a good choice. This method is implemented for reference. 

368 

369 Parameters 

370 ---------- 

371 n_macrostates : int 

372 Number of macrostates. If None, employ the same number of 

373 macrostates as yielded from the reference lumping. 

374 (default None) 

375 """ 

376 if n_macrostates is None: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true

377 n_macrostates = self.reference.n_macrostates[0] 

378 self.gpcca = gp.GPCCA(self.tmat, method="krylov") 

379 self.gpcca.optimize(n_macrostates) 

380 

381 self.n_runs = 1 

382 self.n_macrostates = [n_macrostates] 

383 

384 gma = self.gpcca.macrostate_assignment 

385 gmt = np.zeros(self.trajectory.shape, dtype=self.trajectory.dtype) 

386 gmf = np.empty(self.n_macrostates[0]) 

387 for i in range(self.n_macrostates[0]): 

388 gmt[np.where(np.isin(self.trajectory, np.where(gma == i)[0]))[0]] = i + 1 

389 gmf[i] = self.mean_feature_trajectory[gmt == i + 1].mean() 

390 

391 order = np.argsort(gmf)[::-1] 

392 new_states = np.empty(self.n_macrostates[0], dtype=self.trajectory.dtype.type) 

393 new_states[order] = np.arange( 

394 self.n_macrostates[0], dtype=self.trajectory.dtype.type 

395 ) 

396 self.macrostate_map = [np.empty(gma.shape, dtype=self.trajectory.dtype.type)] 

397 for i in range(self.n_macrostates[0]): 

398 self.macrostate_map[0][np.where(gma == i)] = new_states[i] 

399 

400 self.macrostate_assignment = [ 

401 np.full((self.n_macrostates[0], self.macrostate_map[0].shape[0]), False) 

402 ] 

403 self.macrostate_assignment[0][ 

404 self.macrostate_map[0], 

405 np.arange(self.macrostate_map[0].shape[0], dtype=int), 

406 ] = True 

407 self.macrostate_feature = [gmf[order]] 

408 self.macrostate_trajectory = np.empty( 

409 (self.n_runs, self.trajectory.shape[0]), dtype=self.trajectory.dtype.type 

410 ) 

411 self.macrostate_trajectory[0] = utils.translate_trajectory( 

412 self.trajectory, self.macrostate_map[0] 

413 ) 

414 self.macrostate_tmat = [ 

415 utils.macrostate_tmat(self.tmat, self.macrostate_assignment[0], self.pop) 

416 ] 

417 

418 # Create mock Z and mock full_pop for Sankey plot 

419 # After implementation remove mock Z.npy file in run.py 

420 self.Z = np.zeros((self.n_runs, self.n_states - 1, 4), dtype=np.float64) 

421 self.full_pop = np.zeros((self.n_runs, 2 * self.n_states - 1), dtype=np.uint32) 

422 self.full_pop[0, : self.n_states] = self.pop 

423 

424 last_merged = self.n_states 

425 merge = 0 

426 for macrostate in range(self.n_macrostates[0]): 

427 microstates = np.where(self.macrostate_map[0] == macrostate)[0] 

428 origin = microstates[0] 

429 if microstates.shape[0] > 1: 

430 for target in microstates[1:]: 

431 intermediate_state = self.n_states + merge 

432 self.full_pop[0, intermediate_state] = self.full_pop[ 

433 0, [origin, target] 

434 ].sum() 

435 self.Z[0, merge] = ( 

436 origin, 

437 target, 

438 0.2, 

439 self.full_pop[0, intermediate_state], 

440 ) 

441 origin = intermediate_state 

442 merge += 1 

443 

444 if macrostate > 0: 

445 intermediate_state = self.n_states + merge 

446 target = last_merged 

447 self.full_pop[0, intermediate_state] = self.full_pop[ 

448 0, [origin, target] 

449 ].sum() 

450 self.Z[0, merge] = ( 

451 origin, 

452 target, 

453 0.9, 

454 self.full_pop[0, intermediate_state], 

455 ) 

456 last_merged = intermediate_state 

457 merge += 1 

458 else: 

459 last_merged = origin 

460 

461 self.tree 

462 self.pop_thr = 0 

463 self.q_min = 0.5 

464 

465 def save_macrostate_trajectory(self, out: Path, one_based: bool = False) -> None: 

466 """Write macrostate trajectory to a text file.""" 

467 header = ( 

468 f"# Created by Lumping class\n" 

469 f"# Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M')}\n" 

470 f"# Trajectory contains {self.n_macrostates[self.n_i]} states and {self.macrostate_trajectory.shape[1]} frames.\n" 

471 f"# Trajectory index: {self.n_i}\n" 

472 ) 

473 macrostate_trajectory = self.macrostate_trajectory[self.n_i] 

474 if one_based: 

475 macrostate_trajectory += 1 

476 np.savetxt(out, macrostate_trajectory, fmt="%.0f", header=header) 

477 

478 def save_Z(self, out: Path, n_i: int | Iterable | Literal["all"] = "all") -> None: 

479 """Save Z matrix. 

480 

481 Parameters 

482 ---------- 

483 out : Path 

484 Where to save the Z matrix. 

485 n_i : int or iterable of int or {"all"} 

486 Which lumpings to save. 

487 """ 

488 if not out.endswith(".npy"): 488 ↛ 489line 488 didn't jump to line 489 because the condition on line 488 was never true

489 out += ".npy" 

490 

491 if n_i == "all": 491 ↛ 493line 491 didn't jump to line 493 because the condition on line 491 was always true

492 np.save(out, self.Z) 

493 elif isinstance(n_i, Iterable): 

494 np.save(out, self.Z[n_i]) 

495 elif isinstance(n_i, int): 

496 np.save(out, self.Z[n_i : n_i + 1]) 

497 else: 

498 raise ValueError("n_i must be 'all', Iterable or int.") 

499 

500 def load_Z(self, Z): 

501 """Load Z matrix.""" 

502 if isinstance(Z, np.ndarray): 502 ↛ 503line 502 didn't jump to line 503 because the condition on line 502 was never true

503 self.Z = Z 

504 elif os.path.exists(Z): 504 ↛ 507line 504 didn't jump to line 507 because the condition on line 504 was always true

505 self.Z = np.load(Z) 

506 else: 

507 raise ValueError("Z must be a numpy array or a .npy file.") 

508 

509 self.n_runs = self.Z.shape[0] 

510 # n: number of macrostates 

511 tmat, states = mh.msm.estimate_markov_model( 

512 utils.get_multi_state_trajectory(self.trajectory, self.limits), 

513 self.lagtime, 

514 ) 

515 self.tmat = tmat.astype(float) 

516 _, self.pop = np.unique(self.trajectory, return_counts=True) 

517 self.n_states = len(states) 

518 self.full_pop = np.zeros((self.n_runs, 2 * self.n_states - 1), dtype=np.uint32) 

519 self.full_pop[:, : self.n_states] = self.pop 

520 self.full_pop[:, self.n_states :] = self.Z[:, :, 3] 

521 

522 self.assign_macrostates() 

523 

524 def save_rmsd(self, out): 

525 """Save RMSD of states to numpy file.""" 

526 np.save(out, self.rmsd) 

527 

528 def load_rmsd(self, f_name): 

529 """Save RMSD of states from numpy file.""" 

530 self._rmsd = np.load(f_name) 

531 

532 def draw_random_frames_indices( 

533 self, out: Path | None = None, n: int = 20 

534 ) -> npt.NDArray[np.int_] | None: 

535 """Draw n random frame indices for each macrostate. 

536 

537 Parameters 

538 ---------- 

539 out : Path, optional 

540 Directory where to save the .ndx files. If None, return 

541 indices instead of being saved. (default None) 

542 n : int 

543 Number of random frame indices to draw. (default 20) 

544 

545 Returns 

546 ------- 

547 ndarray of int, shape (N,) or None 

548 N random frame indices if out is None, otherwise retutrns 

549 None. 

550 """ 

551 drawn_frames = np.empty((self.n_macrostates[self.n_i], n), dtype=int) 

552 for state in np.arange(self.n_macrostates[self.n_i]): 

553 frames_in_state = np.where(self.macrostate_trajectory[self.n_i] == state)[0] 

554 drawn_frames[state] = np.random.choice( 

555 frames_in_state, size=n, replace=False 

556 ) 

557 if self.xtc_stride is not None: 

558 drawn_frames *= self.xtc_stride 

559 if out is not None: 

560 # Path(os.path.join(out)).mkdir(parents=True, exist_ok=True) 

561 out.mkdir(parents=True, exist_ok=True) 

562 for s, i in enumerate(drawn_frames): 

563 np.savetxt( 

564 os.path.join(out, f"{s + 1:02d}.ndx"), 

565 i, 

566 fmt="%.0f", 

567 header="[frames]", 

568 ) 

569 else: 

570 return drawn_frames 

571 

572 def draw_random_frames(self, out: Path, n: int = 20) -> None: 

573 """Draw n random frames per macrostate and write pdb files. 

574 

575 Parameters 

576 ---------- 

577 out : Path 

578 Directory where to save the .pdb files. 

579 n : int 

580 Number of random frame indices to draw. (default 20) 

581 """ 

582 for state in np.arange(self.n_macrostates[self.n_i]): 

583 frames_in_state = np.where(self.macrostate_trajectory[self.n_i] == state)[0] 

584 drawn_frames = np.random.choice(frames_in_state, size=n, replace=False) 

585 for i, frame in enumerate(drawn_frames): 

586 f = md.load_xtc( 

587 self.xtc_trajectory_file, 

588 top=self.topology_file, 

589 frame=frame, 

590 ) 

591 f.save_pdb(os.path.join(out, f"S{state}_{i:02d}.pdb")) 

592 

593 def get_best_defined_contacts(self, n: int = 3) -> npt.NDArray[np.int_]: 

594 """Return the feature indices with the least variance. 

595 

596 Calculate the variance for each feature for each macrostete for 

597 lumping n_i and return the indices for features with the least 

598 variance. 

599 

600 Parameters 

601 ---------- 

602 n : int 

603 Number of features to return. (default 3) 

604 

605 Returns 

606 ------- 

607 ndarray of int, shape (N, M) 

608 Indices of least varying features. N is the number of 

609 macrostates and M is the number of features required 

610 (parameter n). 

611 """ 

612 contacts = np.zeros((self.n_macrostates[self.n_i], n), dtype=int) 

613 for i in range(self.n_macrostates[self.n_i]): 

614 contacts[i] = np.argsort( 

615 np.var( 

616 self.multi_feature_trajectory[ 

617 self.macrostate_trajectory[self.n_i] == i 

618 ], 

619 axis=0, 

620 ) 

621 )[:n] 

622 return contacts 

623 

624 def get_least_moving_residues( 

625 self, contact_index_file: Path, n: int = 3 

626 ) -> list[npt.NDArray[np.int_]]: 

627 """Return residue indices of least varying feature. 

628 

629 Parameters 

630 ---------- 

631 contact_index_file : Path 

632 Path to contact index file. Each line contains the two 

633 residue indices space separated, which are part of the 

634 feature. The line index corresponds to the feature index. 

635 n : int 

636 Number of features to consider. (default 3) 

637 

638 Returns 

639 ------- 

640 list of ndarray of int 

641 One list entry per macrostate. The array contains the 

642 indices of the features with least variance for the 

643 respective macrostate. 

644 """ 

645 contact_indices = np.loadtxt(contact_index_file, dtype=int) 

646 contacts = self.get_best_defined_contacts(n) 

647 least_moving_residues = [] 

648 for c in contacts: 

649 least_moving_residues.append(np.unique(contact_indices[c].flatten())) 

650 return least_moving_residues 

651 

652 def write_least_moving_residues( 

653 self, contact_index_file: Path, out: Path, n: int = 3 

654 ) -> None: 

655 """Write indices of least moving residues to an index file. 

656 

657 Parameters 

658 ---------- 

659 contact_index_file : Path 

660 Path to contact index file. Each line contains the two 

661 residue indices space separated, which are part of the 

662 feature. The line index corresponds to the feature index. 

663 out : Path 

664 Target file for the residue indices. Each line holds the 

665 residue indices for one macrostate in a space separated 

666 list 

667 n : int 

668 Number of features to consider. (default 3) 

669 """ 

670 if contact_index_file != "none": 670 ↛ 678line 670 didn't jump to line 678 because the condition on line 670 was always true

671 least_moving_residues = self.get_least_moving_residues( 

672 contact_index_file, n=n 

673 ) 

674 with open(out, "w") as f: 

675 for residues in least_moving_residues: 

676 f.write(f"{' '.join(residues.astype(str))}\n") 

677 else: 

678 with open(out, "w") as f: 

679 f.write("") 

680 

681 def __add__( 

682 self, other: "Lumping" 

683 ) -> tuple["Lumping", "Lumping", npt.NDArray[np.floating]]: 

684 """Return the state overlap between two lumpings. 

685 

686 This is mostly utilized to analyze the similarity of stochastic 

687 lumpings to a deterministic one. 

688 

689 Parameters 

690 ---------- 

691 other : Lumping 

692 Another instance of Lumping with which to compare this 

693 Lumping instance. 

694 

695 Returns 

696 ------- 

697 tuple of ( 

698 Lumping, 

699 Lumping, 

700 (ndarray of float, shape (3, N, M)), 

701 ) 

702 The first Lumping object is the deterministic lumping to 

703 which the stochastic lumping (second Lumping instance) is 

704 compared. The array contains the results: First comes the 

705 union, second the fraction in reference macrostates and 

706 third the fraction in stochastic lumpings. N is the number 

707 of macrostates of the reference lumping and M the number of 

708 stochastic lumpings. The reference is not forcibly 

709 Lumping.reference, but can be any deterministic Lumping 

710 instance. 

711 """ 

712 if self.n_runs == 1 and other.n_runs >= 1: 712 ↛ 714line 712 didn't jump to line 714 because the condition on line 712 was never true

713 # reference 

714 ref = self 

715 # stochastic lumping 

716 sto = other 

717 elif other.n_runs == 1 and self.n_runs >= 1: 717 ↛ 721line 717 didn't jump to line 721 because the condition on line 717 was always true

718 ref = other 

719 sto = self 

720 else: 

721 raise ValueError("The reference lumping must have exactly one run.") 

722 return ref, sto, utils.similarity(ref, sto) 

723 

724 @property 

725 def reference(self) -> "Lumping": 

726 """The reference lupming of the system. 

727 

728 Only the transition probability is utilized to estimate the 

729 state similarity in the lumping process. 

730 

731 Returns 

732 ------- 

733 Lumping 

734 An instance of Lumping with reference parameters. 

735 """ 

736 if self._reference is None: 736 ↛ 749line 736 didn't jump to line 749 because the condition on line 736 was always true

737 k = kernel_module.LumpingKernel() 

738 self._reference = Lumping( 

739 self.trajectory, 

740 self.lagtime, 

741 self.multi_feature_trajectory, 

742 contact_threshold=self.contact_threshold, 

743 pop_thr=self.pop_thr, 

744 q_min=self.q_min, 

745 limits=self.limits, 

746 quiet=True, 

747 ) 

748 self._reference.run_mpp(k) 

749 return self._reference 

750 

751 @property 

752 def n_i(self) -> int: 

753 """Index of the lumping under consideration. 

754 

755 0 for deterministic lumpings. Is set to the lumping with the 

756 longest first implied timescale, if not set manually. 

757 

758 Returns 

759 ------- 

760 int 

761 Index of the lumping under consideration. 

762 """ 

763 if self._n_i is None: 

764 if self.n_runs > 1: 764 ↛ 765line 764 didn't jump to line 765 because the condition on line 764 was never true

765 self._n_i = np.argmax(self.timescales[:, 0]) 

766 else: 

767 self._n_i = 0 

768 return self._n_i 

769 

770 @n_i.setter 

771 def n_i(self, value: int) -> None: 

772 if not isinstance(value, int): 

773 raise ValueError("n_i must be an integer") 

774 self._n_i = value 

775 

776 @property 

777 def timescales(self) -> npt.NDArray[np.floating]: 

778 """Implied timescales property. 

779 

780 If more than the first 3 implied timescales are required, run 

781 the calc_timescales method separately. 

782 

783 Returns 

784 ------- 

785 ndarray of float, shape (N, M) 

786 First M implied timescales for all N runs. 

787 """ 

788 if self._timescales is None: 788 ↛ 790line 788 didn't jump to line 790 because the condition on line 788 was always true

789 self.calc_timescales() 

790 return self._timescales 

791 

792 def calc_timescales(self, ntimescales: int = 3) -> None: 

793 """Calculate implied timescales. 

794 

795 Parameters 

796 ---------- 

797 ntimescales : int 

798 Number of first implied timescales to calculate. 

799 """ 

800 self._timescales = np.zeros((self.n_runs, ntimescales)) 

801 for i, trajectory in enumerate(self.macrostate_trajectory): 

802 self._timescales[i, :] = mh.msm.implied_timescales( 

803 utils.get_multi_state_trajectory(trajectory, self.limits), 

804 [self.lagtime], 

805 ntimescales=ntimescales, 

806 )[0] 

807 

808 @property 

809 def linkage(self) -> npt.NDArray[np.floating]: 

810 """The linkage matrix. 

811 

812 The linkage matrix is derived from the Z matrix and utilized in 

813 former implementations of the MPP algorithm. 

814 

815 Returns 

816 ------- 

817 ndarray of float, shape (N-1, 3) 

818 For a system of N microstates, each line defines a merging. 

819 The state in the first column is merged with the state in 

820 the second column and the new state has the index of the 

821 second state, which is the more stable one. The 

822 metastability of the first state is stored in the third 

823 column. 

824 """ 

825 if self._linkage is None: 

826 self._linkage = utils.Z_to_linkage(self.Z[self.n_i]) 

827 return self._linkage 

828 

829 @property 

830 def macrostate_population(self) -> list[npt.NDArray[np.int_]]: 

831 """Return the macrostate populations for all runs.""" 

832 if self._macrostate_population is None: 

833 self._macrostate_population = [] 

834 for j, ma in enumerate(self.macrostate_assignment): 

835 self._macrostate_population.append( 

836 np.zeros(ma.shape[0], dtype=self.full_pop.dtype.type) 

837 ) 

838 for i, m in enumerate(ma): 

839 self._macrostate_population[-1][i] = self.full_pop[ 

840 j, : self.n_states 

841 ][m.astype(bool)].sum() 

842 return self._macrostate_population 

843 

844 @property 

845 def rmsd(self) -> npt.NDArray[np.floating]: 

846 """Return the root mean square deviation (RMSD) of C-alphas. 

847 

848 The RMSD of C-alphas is calculate so that it is minimal. The 

849 mean_frames attribute holds the mdtraj objects of the frames, 

850 which minimizes the RMSD, for each macrostate. 

851 

852 Returns 

853 ------- 

854 ndarray of float, shape (N, M) 

855 C-alpha RMSD values for N macrostates and M C-alpha atoms. 

856 """ 

857 if self._rmsd is None: 

858 self._rmsd, self.mean_frames = utils.calc_rmsd(self, quiet=self.quiet) 

859 return self._rmsd 

860 

861 def rmsd_sharpness(self) -> float: 

862 """Returns the RMSD sharpness of a lumping. 

863 

864 The RMSD sharpness is given by the population weighted mean of 

865 mean RMSDs: 

866 s = sum_j(mean_i(rmsd_ij) * p_j) / sum_j(p_j) 

867 for C-alpha i, macrostate j and population p. 

868 

869 Returns 

870 ------- 

871 float 

872 RMSD sharpness value 

873 

874 See Also 

875 -------- 

876 Lumping.rmsd : RMSD property. 

877 """ 

878 return ( 

879 self.rmsd.mean(axis=1) * self.macrostate_population[self.n_i] 

880 ).sum() / self.macrostate_population[self.n_i].sum() 

881 

882 @property 

883 def shannon_entropy(self) -> npt.NDArray[np.floating]: 

884 """The Shannon entropy. 

885 

886 The Shannon entropy is employed to measure how evenly the 

887 macrostate are populated in a lumping. The Shannon entropy 

888 penalizes in this case very low puplated macrostates. The lower 

889 the entropy, the more even the population partition of the 

890 macrostates. 

891 

892 Returns 

893 ------- 

894 ndarray of float, shape (Lumping.n_i,) 

895 Array contains the Shannon entropy for each run. 

896 

897 References 

898 ---------- 

899 [1] Claude C. Shannon, "A Mathematical Theory of Communication", 

900 The Bell System Technical Journal, Volume 27, Issue 3, 1948, 

901 DOI: 10.1002/j.1538-7305.1948.tb01338.x 

902 """ 

903 if self._shannon_entropy is None: 903 ↛ 907line 903 didn't jump to line 907 because the condition on line 903 was always true

904 self._shannon_entropy = np.zeros(self.n_runs) 

905 for i, pop in enumerate(self.macrostate_population): 

906 self._shannon_entropy[i] = utils.shannon_entropy(pop) 

907 return self._shannon_entropy 

908 

909 @property 

910 def davies_bouldin_index(self) -> npt.NDArray[np.floating]: 

911 """The davies bouldin index. 

912 

913 Returns 

914 ------- 

915 ndarray of float, shape (Lumping.n_i,) 

916 Array contains the Davies Bouldin index for each run. 

917 

918 References 

919 ---------- 

920 [1] David L. Davies; Donald W. Bouldin, "A Cluster Separation 

921 Measure", IEEE Transactions on Pattern Analysis and Machine 

922 Intelligence, Volume PAMI-1, Issue 2, pp. 224-227, 1979, 

923 DOI: 10.1109/TPAMI.1979.4766909 

924 """ 

925 if self._davies_bouldin_index is None: 925 ↛ 931line 925 didn't jump to line 931 because the condition on line 925 was always true

926 self._davies_bouldin_index = np.zeros(self.n_runs) 

927 for i in range(self.n_runs): 

928 self._davies_bouldin_index[i] = davies_bouldin_score( 

929 self.multi_feature_trajectory, self.macrostate_trajectory[i] 

930 ) 

931 return self._davies_bouldin_index 

932 

933 @property 

934 def gmrq(self) -> npt.NDArray[np.floating]: 

935 """The generalized matrix rayleigh quotient (GMRQ). 

936 

937 Returns 

938 ------- 

939 ndarray of float, shape (Lumping.n_i,) 

940 Array contains the GMRQ for each run. 

941 

942 References 

943 ---------- 

944 [1] Robert T. McGibbon; Vijay S. Pande, "Variational 

945 cross-validation of slow dynamical modes in molecular 

946 kinetics", The Journal of Chemical Physics, Volume 142, 

947 Issue 12, 2015, DOI: 10.1063/1.4916292 

948 """ 

949 if self._gmrq is None: 949 ↛ 951line 949 didn't jump to line 951 because the condition on line 949 was always true

950 self._gmrq = utils.gmrq(self.macrostate_tmat) 

951 return self._gmrq 

952 

953 @property 

954 def macro_micro_feature(self) -> npt.NDArray[np.floating]: 

955 """Assign macrostate feature values to corresponding microstates. 

956 

957 This is useful for the analysis of stochastic lumpings. 

958 

959 Returns 

960 ------- 

961 ndarray of float, shape (n_states, n_runs) 

962 Contains macrostate feature values for each microstate. 

963 

964 See Also 

965 -------- 

966 MPP.plot.macro_feature 

967 """ 

968 if self._macro_micro_feature is None: 968 ↛ 978line 968 didn't jump to line 978 because the condition on line 968 was always true

969 self._macro_micro_feature = np.zeros( 

970 (self.n_states, self.n_runs), 

971 dtype=self.mean_feature_trajectory.dtype.type, 

972 ) 

973 for i, (ma, mf) in enumerate( 

974 zip(self.macrostate_assignment, self.macrostate_feature) 

975 ): 

976 for j, mb in enumerate(ma.astype(bool)): 

977 self._macro_micro_feature[mb, i] = mf[j] 

978 return self._macro_micro_feature 

979 

980 @property 

981 def tree(self) -> list[core.BinaryTreeNode]: 

982 """The lumping tree. 

983 

984 This property holds the lumping trees of all lumpings performed 

985 by this object. The root node is stored for each lumping. 

986 

987 

988 """ 

989 if self._tree is None: 

990 self._tree = [] 

991 for z, pop in zip(self.Z, self.full_pop): 

992 self._tree.append(self.build_tree(z, pop)) 

993 return self._tree 

994 

995 def build_tree(self, Z, full_pop): 

996 """Build tree using BinaryTreeNode and return root""" 

997 n = Z.shape[0] + 1 

998 nodes = {} 

999 for i, (state, target_state, q, pop) in enumerate(Z): 

1000 state = int(state) 

1001 target_state = int(target_state) 

1002 if state not in nodes: 

1003 nodes[state] = core.BinaryTreeNode( 

1004 state, 

1005 self.tmat, 

1006 population=full_pop[state], 

1007 q=q, 

1008 pop_thr=self.pop_thr, 

1009 q_min=self.q_min, 

1010 ) 

1011 if target_state not in nodes: 

1012 nodes[target_state] = core.BinaryTreeNode( 

1013 target_state, 

1014 self.tmat, 

1015 population=full_pop[target_state], 

1016 q=q, 

1017 pop_thr=self.pop_thr, 

1018 q_min=self.q_min, 

1019 ) 

1020 nodes[n + i] = core.BinaryTreeNode( 

1021 n + i, 

1022 self.tmat, 

1023 q=q, 

1024 pop_thr=self.pop_thr, 

1025 q_min=self.q_min, 

1026 ) 

1027 nodes[n + i].left = nodes[state] 

1028 nodes[n + i].right = nodes[target_state] 

1029 for node in nodes[n + i].leaves: 

1030 node.feature = self.mean_feature[node.name] 

1031 return nodes[n + i] 

1032 

1033 @property 

1034 def trajectory(self) -> npt.NDArray[np.int_]: 

1035 """The microstate trajectory - 0-based.""" 

1036 return self._trajectory 

1037 

1038 @trajectory.setter 

1039 def trajectory(self, value): 

1040 if value.min() == 1: 

1041 value -= 1 

1042 warnings.warn("1-based trajectory was shifted to 0-based.") 

1043 if np.unique(value).shape[0] > value.max() + 1: 1043 ↛ 1044line 1043 didn't jump to line 1044 because the condition on line 1043 was never true

1044 raise ValueError("The state numbering in the trajectory is not continuous") 

1045 if value.max() < 2**7: 1045 ↛ 1046line 1045 didn't jump to line 1046 because the condition on line 1045 was never true

1046 trajectory_type = np.uint8 

1047 elif value.max() < 2**15: 1047 ↛ 1050line 1047 didn't jump to line 1050 because the condition on line 1047 was always true

1048 trajectory_type = np.uint16 

1049 else: 

1050 trajectory_type = np.uint32 

1051 self._trajectory = value.astype(trajectory_type) 

1052 

1053 @property 

1054 def topology_file(self): 

1055 """The topology_file property.""" 

1056 if self._topology_file is None: 1056 ↛ 1057line 1056 didn't jump to line 1057 because the condition on line 1056 was never true

1057 raise ValueError("No topology file set.") 

1058 return self._topology_file 

1059 

1060 @topology_file.setter 

1061 def topology_file(self, value): 

1062 if os.path.isfile(value): 1062 ↛ 1065line 1062 didn't jump to line 1065 because the condition on line 1062 was always true

1063 self._topology_file = value 

1064 else: 

1065 raise FileNotFoundError(f"No such file: {value}") 

1066 

1067 @property 

1068 def xtc_trajectory_file(self): 

1069 """The xtc_trajectory_file property.""" 

1070 if self._xtc_trajectory_file is None: 1070 ↛ 1071line 1070 didn't jump to line 1071 because the condition on line 1070 was never true

1071 raise ValueError("No xtc trajectory file set.") 

1072 return self._xtc_trajectory_file 

1073 

1074 @xtc_trajectory_file.setter 

1075 def xtc_trajectory_file(self, value): 

1076 if os.path.isfile(value): 1076 ↛ 1079line 1076 didn't jump to line 1079 because the condition on line 1076 was always true

1077 self._xtc_trajectory_file = value 

1078 else: 

1079 raise FileNotFoundError(f"No such file: {value}") 

1080 

1081 @property 

1082 def frame_length(self) -> float: 

1083 """The frame length in ns.""" 

1084 return self._frame_length 

1085 

1086 @frame_length.setter 

1087 def frame_length(self, value): 

1088 self._frame_length = value 

1089 

1090 

1091class Plotter: 

1092 def __init__(self, obj): 

1093 self._obj = obj 

1094 

1095 def dendrogram(self, out: str, scale=1, offset=0): 

1096 """Plot dendrogram""" 

1097 plot.plot_tree( 

1098 self._obj.tree[self._obj.n_i], 

1099 self._obj.macrostate_assignment[self._obj.n_i], 

1100 out, 

1101 scale=scale, 

1102 offset=offset, 

1103 ) 

1104 

1105 def implied_timescales(self, out, use_ref=True, scale=1): 

1106 """ 

1107 out: File to write plot 

1108 use_ref: If it for reference trajectory should be plotted 

1109 scale: scaling factor for plot 

1110 """ 

1111 if use_ref: 1111 ↛ 1112line 1111 didn't jump to line 1112 because the condition on line 1111 was never true

1112 ref_trajectory = self._obj.reference.macrostate_trajectory[0] 

1113 else: 

1114 ref_trajectory = self._obj.trajectory 

1115 

1116 macrostate_trajectory = utils.get_multi_state_trajectory( 

1117 self._obj.macrostate_trajectory[self._obj.n_i], self._obj.limits 

1118 ) 

1119 

1120 dlagtime = max(1, int(1 / self._obj.frame_length)) 

1121 plot.implied_timescales( 

1122 [ref_trajectory, macrostate_trajectory], 

1123 np.arange(1, 4.5 * self._obj.lagtime + dlagtime, dlagtime, dtype=int), 

1124 out, 

1125 frame_length=self._obj.frame_length, 

1126 first_ref=True, 

1127 scale=scale, 

1128 use_ref=use_ref, 

1129 ntimescales=self._obj.timescales.shape[1], 

1130 ) 

1131 

1132 def macro_feature(self, out, ref=None): 

1133 """ 

1134 Plot histogram of feature distribution. 

1135 

1136 micro_feature (np.ndarray, NxR): N microstates, R runs, holds feature 

1137 values of respective macrostate 

1138 out (str): file to save the plot 

1139 ref (list[tuple]): list of 

1140 - macrostate_assignment 

1141 - macrostate_feature 

1142 - color 

1143 - label 

1144 of the clusterings that should be shown explicitly. 

1145 """ 

1146 plot.macro_feature( 

1147 self._obj.macro_micro_feature, 

1148 out, 

1149 self._obj.reference if ref is None else ref, 

1150 ) 

1151 

1152 def rmsd(self, out, helices=None): 

1153 plot.rmsd( 

1154 self._obj.rmsd, self._obj.macrostate_population[self._obj.n_i], helices, out 

1155 ) 

1156 

1157 def delta_rmsd(self, out, helices=None): 

1158 plot.delta_rmsd( 

1159 self._obj.rmsd, self._obj.macrostate_population[self._obj.n_i], helices, out 

1160 ) 

1161 

1162 def contact_rep(self, cluster_file, out, scale=1): 

1163 plot.contact_rep( 

1164 self._obj.multi_feature_trajectory, 

1165 cluster_file, 

1166 self._obj.macrostate_trajectory[self._obj.n_i], 

1167 out, 

1168 utils.get_grid_format(self._obj.n_macrostates[self._obj.n_i]), 

1169 scale=scale, 

1170 ) 

1171 

1172 def relative_implied_timescales(self, out): 

1173 plot.relative_implied_timescales(self._obj, out) 

1174 

1175 def ck_test(self, out): 

1176 plot.chapman_kolmogorov(self._obj, out, self._obj.frame_length) 

1177 

1178 def state_network(self, out): 

1179 plot.state_network(self._obj, out) 

1180 

1181 def stochastic_state_similarity(self, out): 

1182 plot.stochastic_state_similarity(self._obj, self._obj.reference, out) 

1183 

1184 def transition_matrix(self, out): 

1185 plot.transition_matrix(self._obj.macrostate_tmat[self._obj.n_i], out) 

1186 

1187 def transition_time(self, out): 

1188 plot.transition_time( 

1189 self._obj.macrostate_tmat[self._obj.n_i], 

1190 out, 

1191 lagtime=self._obj.lagtime, 

1192 frame_length=self._obj.frame_length, 

1193 ) 

1194 

1195 def graph(self, out, u=0, f=0): 

1196 draw_knetwork( 

1197 self._obj.macrostate_trajectory[self._obj.n_i], 

1198 self._obj.lagtime, 

1199 self._obj.mean_feature_trajectory, 

1200 out, 

1201 u=u, 

1202 f=f, 

1203 ) 

1204 

1205 def sankey(self, out, ax=None, scale=1): 

1206 plot.sankey_diagram(self._obj, self._obj.reference, out, ax=ax, scale=scale) 

1207 

1208 def macrostate_trajectory(self, out, row_length=0.2): 

1209 plot.state_trajectory( 

1210 self._obj.macrostate_trajectory[self._obj.n_i], 

1211 out, 

1212 row_length=row_length, 

1213 frame_length=self._obj.frame_length, 

1214 )