Coverage for src / molecular_simulations / simulate / omm_simulator.py: 18%

308 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-12 10:07 -0600

1from copy import deepcopy 

2import logging 

3import MDAnalysis as mda 

4import numpy as np 

5from openmm import * 

6from openmm.app import * 

7from openmm.unit import * 

8from openmm.app.internal.singleton import Singleton 

9from pathlib import Path 

10from typing import Optional, Union 

11 

12PathLike = Union[Path, str] 

13OptPath = Union[Path, str, None] 

14 

15logger = logging.getLogger(__name__) 

16 

17class Simulator: 

18 """ 

19 Class for performing OpenMM simulations on AMBER FF inputs. Inputs must conform 

20 to naming conventions found below in the init. 

21 

22 Arguments: 

23 path (PathLike): Path to simulation inputs, same as output path. 

24 top_name (Optional[str]): Defaults to None. Optional topology file name. If not 

25 provided, we will assume this is `system.prmtop`. 

26 coor_name (Optional[str]): Defaults to None. Optional coordinate file name. If not 

27 provided, we will assume this is `system.inpcrd`. 

28 out_path (Optional[Path]): Defaults to None. Optional output path which is where we 

29 will place the simulation outputs. If not provided, we will assume this is  

30 `self.path`. 

31 ff (str): Switch for whether or not to utilize AMBER or CHARMM ff. 

32 equil_steps (int): Defaults to 1,250,000 (2.5 ns at 2 fs timestep). Number of simulation  

33 timesteps to perform equilibration. 

34 prod_steps (int): Defaults to 250,000,000 (1 µs at 4 fs timestep). Number of simulation  

35 timesteps to perform production MD. 

36 n_equil_cycles (int): Defaults to 3 cycles. Number of additional unrestrained  

37 equilibration cycles to perform. Increasing this may triage unstable systems  

38 at the cost of more simulation time albeit this may be negligible in the  

39 grand scheme of things. 

40 temperature (float): Defaults to 300.0. Temperature for simulation. 

41 eq_reporter_frequency (int): Defaults to 1,000 timesteps. How often to write out to 

42 the logs and trajectory files in equilibration. 

43 prod_reporter_frequency (int): Defaults to 10,000 timesteps. How often to write out to 

44 the logs and trajectory files in production. 

45 platform (str): Defaults to CUDA. Which OpenMM platform to simulate with (options 

46 include CUDA, CPU, OpenCL). 

47 device_ids (list[int]): Defaults to [0]. Which accelerators to use (multiple GPU  

48 support is tenuous with OpenMM). Primarily used to distribute jobs to different 

49 GPUs on a node of an HPC resource. 

50 force_constant (float): Defaults to 10.0 kcal/mol*Å^2. Force constant to use for  

51 harmonic restraints during equilibration. Currently restraints are only applied 

52 to protein backbone atoms. 

53 params (Optional[str]): Optional list of CHARMM parameter files for loading from  

54 psf/pdb file using CHARMM36m forcefield. 

55 """ 

56 def __init__(self, 

57 path: PathLike, 

58 top_name: Optional[str]=None, 

59 coor_name: Optional[str]=None, 

60 out_path: Optional[Path]=None, 

61 ff: str='amber', 

62 equil_steps: int=1_250_000, 

63 prod_steps: int=250_000_000, 

64 n_equil_cycles: int=3, 

65 temperature: float=300., 

66 eq_reporter_frequency: int=1_000, 

67 prod_reporter_frequency: int=10_000, 

68 platform: str='CUDA', 

69 device_ids: list[int]=[0], 

70 force_constant: float=10., 

71 params: Optional[str]=None, 

72 membrane: bool=False): 

73 self.path = Path(path) # enforce path object 

74 self.top_file = self.path / top_name if top_name is not None else self.path / 'system.prmtop' 

75 self.coor_file = self.path / coor_name if coor_name is not None else self.path / 'system.inpcrd' 

76 self.temperature = temperature 

77 

78 self.ff = ff.lower() 

79 self.params = params # for charmm parameter sets 

80 self.setup_barostat(membrane) 

81 

82 if out_path is not None: 

83 p = out_path 

84 else: 

85 p = path 

86 

87 # logging/checkpointing stuff 

88 self.eq_state = p / 'eq.state' 

89 self.eq_chkpt = p / 'eq.chk' 

90 self.eq_log = p / 'eq.log' 

91 self.eq_dcd = p / 'eq.dcd' 

92 self.eq_freq = eq_reporter_frequency 

93 

94 self.dcd = p / 'prod.dcd' 

95 self.restart = p / 'prod.rst.chk' 

96 self.state = p / 'prod.state' 

97 self.chkpt = p / 'prod.chk' 

98 self.prod_log = p / 'prod.log' 

99 self.prod_freq = prod_reporter_frequency 

100 

101 # simulation details 

102 self.equil_cycles = n_equil_cycles 

103 self.equil_steps = equil_steps 

104 self.prod_steps = prod_steps 

105 self.k = force_constant 

106 self.platform = Platform.getPlatformByName(platform) 

107 self.properties = {'DeviceIndex': ','.join([str(x) for x in device_ids]), 

108 'Precision': 'mixed'} 

109 

110 if platform == 'CPU': 

111 self.properties = {} 

112 

113 def setup_barostat(self, 

114 is_membrane_system: bool) -> None: 

115 """Chooses the correct barostat for the system based on whether or not there is 

116 a membrane present. 

117 

118 Args: 

119 is_membrane_system (bool): True if this is a membrane-containing system. 

120  

121 Returns: 

122 None 

123 """ 

124 self.barostat_args = { 

125 'defaultPressure': 1 * bar, 

126 } 

127 

128 if is_membrane_system: 

129 self.barostat = MonteCarloMembraneBarostat 

130 self.barostat_args.update({ 

131 'defaultSurfaceTension': 0 * bar * nm, 

132 'defaultTemperature': self.temperature * kelvin, 

133 'xymode': MonteCarloMembraneBarostat.XYIsotropic, 

134 'zmode': MonteCarloMembraneBarostat.ZFree 

135 }) 

136 else: 

137 self.barostat = MonteCarloBarostat 

138 self.barostat_args.update({ 

139 'temperature': self.temperature * kelvin 

140 }) 

141 

142 def load_system(self) -> System: 

143 """Loads correct set of files based on which forcefield we have specified. 

144 

145 Raises: 

146 AttributeError: If a non-valid choice of forcefield is made (not amber or charmm). 

147 

148 Returns: 

149 System: OpenMM system loaded from correct files/forcefield. 

150 """ 

151 if self.ff == 'amber': 

152 system = self.load_amber_files() 

153 elif self.ff == 'charmm': 

154 system = self.load_charmm_files() 

155 else: 

156 raise AttributeError(f'self.ff must be a valid MD forcefield [amber, charmm]!') 

157 

158 if not hasattr(self, 'indices'): 

159 self.indices = self.get_restraint_indices() 

160 

161 return system 

162 

163 def load_amber_files(self) -> System: 

164 """Builds an OpenMM system using the prmtop/inpcrd files. PME is utilized for 

165 electrostatics and a 1 nm non-bonded cutoff is used as well as 1.5 amu HMR. 

166 

167 Returns: 

168 (System): OpenMM system 

169 """ 

170 if not hasattr(self, 'coordinate'): 

171 self.coordinate = AmberInpcrdFile(str(self.coor_file)) 

172 self.topology = AmberPrmtopFile(str(self.top_file), 

173 periodicBoxVectors=self.coordinate.boxVectors) 

174 

175 system = self.topology.createSystem(nonbondedMethod=PME, 

176 removeCMMotion=False, 

177 nonbondedCutoff=1. * nanometer, 

178 constraints=HBonds, 

179 hydrogenMass=1.5 * amu) 

180 

181 return system 

182 

183 def load_charmm_files(self) -> System: 

184 """Builds an OpenMM system using the psf/pdb files. PME is utilized for  

185 electrostatics and a 1 nm non-bonded cutoff is used as well as 1.5 amu HMR. 

186 

187 Returns: 

188 (System): OpenMM system 

189 """ 

190 if not hasattr(self, 'coordinate'): 

191 self.coordinate = PDBFile(str(self.coor_file)) 

192 self.topology = CharmmPsfFile(str(self.top_file), 

193 periodicBoxVectors=self.coordinate.topology.getPeriodicBoxVectors()) 

194 if not hasattr(self, parameter_set) and self.params is not None: 

195 self.parameter_set = CharmmParameterSet(*self.params) 

196 

197 if self.params is None: 

198 self.forcefield = ForceField('charmm36_2024.xml', 'charmm36/water.xml') 

199 system = self.forcefield.createSystem(self.coordinate.topology, 

200 nonbondedMethod=PME, 

201 nonbondedCutoff=1.2 * nanometer, 

202 constraints=HBonds) 

203 else: 

204 system = self.topology.createSystem(self.parameter_set, 

205 nonbondedMethod=PME, 

206 nonbondedCutoff=1.2 * nanometer, 

207 constraints=HBonds) 

208 

209 return system 

210 

211 def setup_sim(self, 

212 system: System, 

213 dt: float) -> tuple[Simulation, Integrator]: 

214 """Builds OpenMM Integrator and Simulation objects utilizing a provided 

215 OpenMM System object and integration timestep, dt. 

216 

217 Arguments: 

218 system (System): OpenMM system to build simulation object of. 

219 dt (float): Integration timestep in units of picoseconds. 

220 

221 Returns: 

222 (tuple[Simulation, Integrator]): A tuple containing both the Simulation 

223 and Integrator objects. 

224 """ 

225 integrator = LangevinMiddleIntegrator(self.temperature*kelvin, 

226 1/picosecond, 

227 dt*picoseconds) 

228 simulation = Simulation(self.topology.topology, 

229 system, 

230 integrator, 

231 self.platform, 

232 self.properties) 

233 

234 return simulation, integrator 

235 

236 def run(self) -> None: 

237 """ 

238 Main logic of class. Determines whether or not we are restarting based on 

239 if all the equilibration outputs are present. Importantly the state file 

240 will not be written out until equilibration is complete. Also checks the 

241 production MD log to see if we have finished, otherwise decrements our 

242 progress from the total number of timesteps. Finally, runs production MD. 

243 """ 

244 skip_eq = all([f.exists() 

245 for f in [self.eq_state, self.eq_chkpt, self.eq_log]]) 

246 if not skip_eq: 

247 logger.info('No restart detected, will begin equilibration.') 

248 self.equilibrate() 

249 logger.info(f'Equilibration finished, running {self.prod_steps} steps of production MD.') 

250 

251 if self.restart.exists(): 

252 logger.info('Checkpoint file detected, resuming simulation.') 

253 self.check_num_steps_left() 

254 logger.info(f'Will run {self.prod_steps} steps of production MD.') 

255 self.production(chkpt=str(self.restart), 

256 restart=True) 

257 else: 

258 self.production(chkpt=str(self.eq_chkpt), 

259 restart=False) 

260 

261 logger.info('Production MD run complete.') 

262 

263 def equilibrate(self) -> Simulation: 

264 """ 

265 Sets up and runs equilibrium MD, including energy minimization to begin. 

266 Sets backbone harmonic restraints and performs a slow heating protocol 

267 before periodically lessening restraints, finishing with user-specified 

268 number of rounds of unrestrained equilibration. 

269 

270 Returns: 

271 (Simulation): OpenMM simulation object. 

272 """ 

273 system = self.load_system() 

274 system = self.add_backbone_posres(system, 

275 self.coordinate.positions, 

276 self.topology.topology.atoms(), 

277 self.indices, 

278 self.k) 

279 

280 simulation, integrator = self.setup_sim(system, dt=0.002) 

281 

282 simulation.context.setPositions(self.coordinate.positions) 

283 simulation.minimizeEnergy() 

284 

285 simulation.reporters.append(StateDataReporter(str(self.eq_log), 

286 self.eq_freq, 

287 step=True, 

288 potentialEnergy=True, 

289 speed=True, 

290 temperature=True)) 

291 simulation.reporters.append(DCDReporter(str(self.eq_dcd), self.eq_freq)) 

292 

293 simulation, integrator = self._heating(simulation, integrator) 

294 simulation = self._equilibrate(simulation) 

295 

296 return simulation 

297 

298 def production(self, 

299 chkpt: PathLike, 

300 restart: bool=False) -> None: 

301 """ 

302 Performs production MD. Loads a new system, integrator and simulation object 

303 and loads equilibration or past production checkpoint to allow either the 

304 continuation of a past simulation or to inherit the outputs of equilibration. 

305 

306 Arguments: 

307 chkpt (PathLike): The checkpoint file to load. Should be either the equilibration 

308 checkpoint or a past production checkpoint. 

309 restart (bool): Defaults to False. Flag to ensure we log the full simulation 

310 to reporter log. Otherwise restarts will overwrite the original log file. 

311 """ 

312 system = self.load_system() 

313 simulation, _ = self.setup_sim(system, dt=0.004) 

314 

315 system.addForce(self.barostat(*self.barostat_args.values())) 

316 simulation.context.reinitialize(True) 

317 

318 if restart: 

319 log_file = open(str(self.prod_log), 'a') 

320 else: 

321 log_file = str(self.prod_log) 

322 

323 simulation = self.load_checkpoint(simulation, chkpt) 

324 simulation = self.attach_reporters(simulation, 

325 self.dcd, 

326 log_file, 

327 str(self.restart), 

328 restart=restart) 

329 

330 self.simulation = self._production(simulation) # save simulation object 

331 

332 def load_checkpoint(self, 

333 simulation: Simulation, 

334 checkpoint: PathLike) -> Simulation: 

335 """ 

336 Loads a previous checkpoint into provided simulation object. 

337  

338 Arguments: 

339 simulation (Simulation): OpenMM simulation object. 

340 checkpoint (PathLike): OpenMM checkpoint file. 

341 

342 Returns: 

343 (Simulation): OpenMM simulation object that has been checkpointed. 

344 """ 

345 simulation.loadCheckpoint(checkpoint) 

346 state = simulation.context.getState(getVelocities=True, getPositions=True) 

347 positions = state.getPositions() 

348 velocities = state.getVelocities() 

349 

350 simulation.context.setPositions(positions) 

351 simulation.context.setVelocities(velocities) 

352 

353 return simulation 

354 

355 def attach_reporters(self, 

356 simulation: Simulation, 

357 dcd_file: PathLike, 

358 log_file: PathLike, 

359 rst_file: PathLike, 

360 restart: bool=False) -> Simulation: 

361 """ 

362 Attaches a StateDataReporter for logging, CheckpointReporter to output  

363 periodic checkpoints and a DCDReporter to output trajectory data to  

364 simulation object. 

365 

366 Arguments: 

367 simulation (Simulation): OpenMM simulation object. 

368 dcd_file (PathLike): DCD file to write to. 

369 log_file (PathLike): Log file to write to. 

370 rst_file (PathLike): Checkpoint file to write to. 

371 restart (bool): Defaults to False. Whether or not we should be appending 

372 to an existing DCD file or writing a new one, potentially overwriting 

373 an existing DCD (if false). 

374 

375 Returns: 

376 (Simulation): OpenMM simulation object with reporters now attached. 

377 """ 

378 simulation.reporters.extend([ 

379 DCDReporter( 

380 dcd_file, 

381 self.prod_freq, 

382 append=restart 

383 ), 

384 StateDataReporter( 

385 log_file, 

386 self.prod_freq, 

387 step=True, 

388 potentialEnergy=True, 

389 temperature=True, 

390 progress=True, 

391 remainingTime=True, 

392 speed=True, 

393 volume=True, 

394 totalSteps=self.prod_steps, 

395 separator='\t' 

396 ), 

397 CheckpointReporter( 

398 rst_file, 

399 self.prod_freq * 10 

400 ) 

401 ]) 

402 

403 return simulation 

404 

405 def _heating(self, 

406 simulation: Simulation, 

407 integrator: Integrator) -> tuple[Simulation, Integrator]: 

408 """ 

409 Slow heating protocol. Seeds velocities at 5 kelvin to begin, and 

410 over a period of 100,000 timesteps gradually heats up to 300 kelvin 

411 in 1,000 discrete jumps. This should generalize to most systems but 

412 if your system requires more control these hard-coded values will need 

413 to be changed here. 

414 

415 Arguments: 

416 simulation (Simulation): OpenMM simulation object. 

417 integrator (Integrator): OpenMM integrator object. 

418 

419 Returns: 

420 (tuple[Simulation, Integrator]): Tuple containing simulation and 

421 integrator objects after heating has been performed. 

422 """ 

423 simulation.context.setVelocitiesToTemperature(5*kelvin) 

424 T = 5 

425 

426 integrator.setTemperature(T * kelvin) 

427 mdsteps = 100000 

428 heat_steps = 1000 

429 length = mdsteps // heat_steps 

430 tstep = (self.temperature - T) / length 

431 for i in range(length): 

432 simulation.step(heat_steps) 

433 temp = T + tstep * (1 + i) 

434 

435 if temp > self.temperature: 

436 temp = self.temperature 

437 

438 integrator.setTemperature(temp * kelvin) 

439 

440 return simulation, integrator 

441 

442 def _equilibrate(self, 

443 simulation: Simulation) -> Simulation: 

444 """ 

445 Equilibration procotol.  

446 (1) Performs a 5-step restraint relaxation in NVT ensemble wherein  

447 each step 1/5 of the restraint is relaxed until there are no harmonic  

448 restraints. Length of each step is also 1/5 of the total equil_steps  

449 set by the user.  

450 (2) One step-length of unrestrained NVT is performed before turning on  

451 barostat for NPT. 

452 (3) `equil_cycles` number of step-lengths are then ran with NPT before 

453 saving out the state and checkpoints. 

454 

455 Arguments: 

456 simulation (Simulation): OpenMM simulation object. 

457 

458 Returns: 

459 (Simulation): Equilibrated OpenMM simulation object. 

460 """ 

461 simulation.context.reinitialize(True) 

462 n_levels = 5 

463 d_k = self.k / n_levels 

464 eq_steps = self.equil_steps // n_levels 

465 

466 for i in range(n_levels): 

467 simulation.step(eq_steps) 

468 k = float(self.k - (i * d_k)) 

469 simulation.context.setParameter('k', (k * kilocalories_per_mole/angstroms**2)) 

470 

471 simulation.context.setParameter('k', 0) 

472 simulation.step(eq_steps) 

473 

474 simulation.system.addForce(self.barostat(*self.barostat_args.values())) 

475 simulation.step(self.equil_cycles * eq_steps) 

476 

477 simulation.saveState(str(self.eq_state)) 

478 simulation.saveCheckpoint(str(self.eq_chkpt)) 

479 

480 return simulation 

481 

482 def _production(self, 

483 simulation: Simulation) -> Simulation: 

484 """ 

485 Production MD procotol. Runs for specified number of timesteps before 

486 saving out a final state and checkpoint file. 

487 

488 Arguments: 

489 simulation (Simulation): OpenMM simulation object. 

490 

491 Returns: 

492 (Simulation): OpenMM simulation object. 

493 """ 

494 simulation.step(self.prod_steps) 

495 simulation.saveState(str(self.state)) 

496 simulation.saveCheckpoint(str(self.chkpt)) 

497 

498 return simulation 

499 

500 def get_restraint_indices(self, 

501 addtl_selection: str='') -> list[int]: 

502 """ 

503 Obtains atom indices that will be used to set harmonic restraints. First 

504 loads an MDAnalysis universe with the input prmtop and inpcrd files. Uses 

505 a base selection of protein or nucleic acid backbone but if provided an 

506 additional selection can be included for things like restraining ligand 

507 molecules. 

508 

509 Arguments: 

510 addtl_selection (str): Defaults to empty string. If provided, will 

511 be used to define additional atoms for restraining. 

512 

513 Returns: 

514 (list[int]): List of atomic indices for atoms to be restrained. 

515 """ 

516 u = mda.Universe(str(self.top_file), str(self.coor_file)) 

517 if addtl_selection: 

518 sel = u.select_atoms(f'backbone or nucleicbackbone or {addtl_selection}') 

519 else: 

520 sel = u.select_atoms('backbone or nucleicbackbone') 

521 

522 return sel.atoms.ix 

523 

524 def check_num_steps_left(self) -> None: 

525 """ 

526 Reads the production log file to see if we have completed a simulation. 

527 If there is still simulation to be completed, decrements the existing progress 

528 from internal number of steps to perform. Additionally, accounts for any frames 

529 that have been written to DCD that are not accounted for in the log. 

530 

531 Returns: 

532 None 

533 """ 

534 prod_log = open(str(self.prod_log)).readlines() 

535 

536 try: 

537 last_line = prod_log[-1] 

538 last_step = int(last_line.split()[1].strip()) 

539 except IndexError: 

540 try: 

541 last_line = prod_log[-2] 

542 last_step = int(last_line.split()[1].strip()) 

543 except IndexError: # something weird happend just run full time 

544 return 

545 

546 if time_left := (self.prod_steps - last_step): 

547 self.prod_steps -= time_left 

548 

549 if n_repeat_timesteps := (last_step % (self.prod_freq * 10)): 

550 self.prod_steps -= n_repeat_timesteps 

551 n_repeat_frames = n_repeat_timesteps / self.prod_freq 

552 

553 n_total_frames = last_step / self.prod_freq 

554 

555 lines = [f'{n_total_frames - n_repeat_frames},{n_total_frames}'] 

556 duplicate_log = self.path / 'duplicate_frames.log' 

557 if duplicate_log.exists(): 

558 mode = 'a' 

559 else: 

560 mode = 'w' 

561 lines = ['first_frame,last_frame'] + lines 

562 

563 with open(str(duplicate_log), mode) as fout: 

564 fout.write('\n'.join(lines)) 

565 

566 @staticmethod 

567 def add_backbone_posres(system: System, 

568 positions: np.ndarray, 

569 atoms: list[topology.Atom], 

570 indices: list[int], 

571 restraint_force: float=10.) -> System: 

572 """ 

573 Adds harmonic restraints to an OpenMM system. 

574 

575 Arguments: 

576 system (System): OpenMM system object. 

577 positions (np.ndarray): Position array for all atoms in system. 

578 atoms (list[topology.Atom]): List of all Atom objects in system. 

579 indices (list[int]): List of atomic indices to restrain. 

580 restraint_force (float): Defaults to 10.0 kcal/mol*Å^2. The force 

581 constant to use for harmonic restraints. 

582 

583 Returns: 

584 (System): OpenMM system with harmonic restraints. 

585 """ 

586 force = CustomExternalForce("k*periodicdistance(x, y, z, x0, y0, z0)^2") 

587 

588 force_amount = restraint_force * kilocalories_per_mole/angstroms**2 

589 force.addGlobalParameter("k", force_amount) 

590 force.addPerParticleParameter("x0") 

591 force.addPerParticleParameter("y0") 

592 force.addPerParticleParameter("z0") 

593 

594 for i, (atom_crd, atom) in enumerate(zip(positions, atoms)): 

595 if atom.index in indices: 

596 force.addParticle(i, atom_crd.value_in_unit(nanometers)) 

597 

598 posres_sys = deepcopy(system) 

599 posres_sys.addForce(force) 

600 

601 return posres_sys 

602 

603class ImplicitSimulator(Simulator): 

604 """ 

605 Implicit solvent simulator. Inherits from Simulator and overloads relevant 

606 methods to support the slight differences in how implicit solvent is implemented 

607 in OpenMM. 

608  

609 Arguments: 

610 path (PathLike): Path to simulation inputs, same as output path. 

611 top_name (Optional[str]): Defaults to None. Optional topology file name. If not 

612 provided, we will assume this is `system.prmtop`. 

613 coor_name (Optional[str]): Defaults to None. Optional coordinate file name. If not 

614 provided, we will assume this is `system.inpcrd`. 

615 out_path (Optional[Path]): Defaults to None. Optional output path which is where we 

616 will place the simulation outputs. If not provided, we will assume this is  

617 `self.path`. 

618 ff (str): Switch for whether or not to utilize AMBER or CHARMM ff. 

619 equil_steps (int): Defaults to 1,250,000 (2.5 ns at 2 fs timestep). Number of simulation  

620 timesteps to perform equilibration. 

621 prod_steps (int): Defaults to 250,000,000 (1 µs at 4 fs timestep). Number of simulation  

622 timesteps to perform production MD. 

623 n_equil_cycles (int): Defaults to 3 cycles. Number of additional unrestrained  

624 equilibration cycles to perform. Increasing this may triage unstable systems  

625 at the cost of more simulation time albeit this may be negligible in the  

626 grand scheme of things. 

627 temperature (float): Defaults to 300.0. Temperature for simulation. 

628 eq_reporter_frequency (int): Defaults to 1,000 timesteps. How often to write out to 

629 the logs and trajectory files in equilibration. 

630 prod_reporter_frequency (int): Defaults to 10,000 timesteps. How often to write out to 

631 the logs and trajectory files in production. 

632 platform (str): Defaults to CUDA. Which OpenMM platform to simulate with (options 

633 include CUDA, CPU, OpenCL). 

634 device_ids (list[int]): Defaults to [0]. Which accelerators to use (multiple GPU  

635 support is tenuous with OpenMM). Primarily used to distribute jobs to different 

636 GPUs on a node of an HPC resource. 

637 force_constant (float): Defaults to 10.0 kcal/mol*Å^2. Force constant to use for  

638 harmonic restraints during equilibration. Currently restraints are only applied 

639 to protein backbone atoms. 

640 implicit_solvent (Singleton): Defaults to GBn2. Which implicit solvent model to use. 

641 solute_dielectric (float): Defaults to 1.0. Probably shouldn't change this. 

642 solvent_dielectric (float): Defaults to 78.5. Also shouldn't change this. 

643 """ 

644 def __init__(self, 

645 path: str, 

646 top_name: Optional[str]=None, 

647 coor_name: Optional[str]=None, 

648 out_path: Optional[Path]=None, 

649 ff: str = 'amber', 

650 equil_steps: int=1_250_000, 

651 prod_steps: int=250_000_000, 

652 n_equil_cycles: int=3, 

653 temperature: float=300., 

654 eq_reporter_frequency: int=1_000, 

655 prod_reporter_frequency: int=10_000, 

656 platform: str='CUDA', 

657 device_ids: list[int]=[0], 

658 force_constant: float=10., 

659 implicit_solvent: Singleton=GBn2, 

660 solute_dielectric: float=1., 

661 solvent_dielectric: float=78.5, 

662 **kwargs): 

663 super().__init__(path=path, top_name=top_name, 

664 coor_name=coor_name, out_path=out_path, 

665 ff=ff, equil_steps=equil_steps, 

666 prod_steps=prod_steps, 

667 n_equil_cycles=n_equil_cycles, 

668 temperature=temperature, 

669 eq_reporter_frequency=eq_reporter_frequency, 

670 prod_reporter_frequency=prod_reporter_frequency, 

671 platform=platform, device_ids=device_ids, 

672 force_constant=force_constant) 

673 self.solvent = implicit_solvent 

674 self.solute_dielectric = solute_dielectric 

675 self.solvent_dielectric = solvent_dielectric 

676 # solvent screening parameter for 150mM ions 

677 # k = 367.434915 * sqrt(conc. [M] / (solvent_dielectric * T)) 

678 self.kappa = 367.434915 * np.sqrt(.15 / (solvent_dielectric * 300)) 

679 

680 def load_amber_files(self) -> System: 

681 """ 

682 Loads an OpenMM system object with implicit solvent parameters. 

683 

684 Returns: 

685 (System): OpenMM system object. 

686 """ 

687 if not hasattr(self, 'coordinate'): 

688 self.coordinate = AmberInpcrdFile(str(self.coor_file)) 

689 self.topology = AmberPrmtopFile(str(self.top_file)) 

690 

691 system = self.topology.createSystem(nonbondedMethod=NoCutoff, 

692 removeCMMotion=False, 

693 constraints=HBonds, 

694 hydrogenMass=1.5 * amu, 

695 implicitSolvent=self.solvent, 

696 soluteDielectric=self.solute_dielectric, 

697 solventDielectric=self.solvent_dielectric, 

698 implicitSolventKappa=self.kappa/nanometer) 

699 

700 return system 

701 

702 def equilibrate(self) -> Simulation: 

703 """ 

704 Runs reduced equilibration protocol. Due to the faster convergence of 

705 using implicit solvent we don't need to be quite as rigorous as with 

706 explicit solvent system relaxation. 

707 

708 Returns: 

709 (Simulation): OpenMM simulation object. 

710 """ 

711 system = self.load_system() 

712 system = self.add_backbone_posres(system, 

713 self.coordinate.positions, 

714 self.topology.topology.atoms(), 

715 self.indices, 

716 self.k) 

717 

718 simulation, integrator = self.setup_sim(system, dt=0.002) 

719 

720 simulation.context.setPositions(self.coordinate.positions) 

721 state = simulation.context.getState(getEnergy=True) 

722 print(f'Energy before minimization: {state.getPotentialEnergy()}') 

723 simulation.minimizeEnergy() 

724 state = simulation.context.getState(getEnergy=True) 

725 print(f'Energy after minimization: {state.getPotentialEnergy()}') 

726 

727 simulation.reporters.append(StateDataReporter(str(self.eq_log), 

728 self.eq_freq, 

729 step=True, 

730 potentialEnergy=True, 

731 speed=True, 

732 temperature=True)) 

733 simulation.reporters.append(DCDReporter(str(self.eq_dcd), self.eq_freq)) 

734 

735 simulation, integrator = self._heating(simulation, integrator) 

736 simulation = self._equilibrate(simulation) 

737 

738 return simulation 

739 

740 def production(self, 

741 chkpt: PathLike, 

742 restart: bool=False) -> None: 

743 """ 

744 Performs production MD. Loads a new system, integrator and simulation object 

745 and loads equilibration or past production checkpoint to allow either the 

746 continuation of a past simulation or to inherit the outputs of equilibration. 

747 

748 Arguments: 

749 chkpt (PathLike): The checkpoint file to load. Should be either the equilibration 

750 checkpoint or a past production checkpoint. 

751 restart (bool): Defaults to False. Flag to ensure we log the full simulation 

752 to reporter log. Otherwise restarts will overwrite the original log file. 

753 """ 

754 system = self.load_system() 

755 simulation, _ = self.setup_sim(system, dt=0.004) 

756 

757 simulation.context.reinitialize(True) 

758 

759 if restart: 

760 log_file = open(str(self.prod_log), 'a') 

761 else: 

762 log_file = str(self.prod_log) 

763 

764 simulation = self.load_checkpoint(simulation, chkpt) 

765 simulation = self.attach_reporters(simulation, 

766 self.dcd, 

767 log_file, 

768 str(self.restart), 

769 restart=restart) 

770 

771 self.simulation = self._production(simulation) # save simulation object 

772 

773class CustomForcesSimulator(Simulator): 

774 """ 

775 Simulator for utilizing custom user-defined forces. Inherits from Simulator while 

776 providing a way to inject custom forces. 

777 

778 Arguments: 

779 path (PathLike): Path to simulation inputs, same as output path. 

780 custom_force_objects (list): ?? 

781 equil_steps (int): Defaults to 1,250,000 (2.5 ns). Number of simulation timesteps to 

782 perform equilibration for (2fs timestep). 

783 prod_steps (int): Defaults to 250,000,000 (1 µs). Number of simulation timesteps to 

784 perform production MD for (4fs timestep). 

785 n_equil_cycles (int): Defaults to 3 cycles. Number of additional unrestrained  

786 equilibration cycles to perform. Increasing this may triage unstable systems  

787 at the cost of more simulation time albeit this may be negligible in the  

788 grand scheme of things. 

789 reporter_frequency (int): Defaults to 1,000 timesteps. How often to write out to 

790 the logs and trajectory files in equilibration. X times less frequently during 

791 production MD. 

792 platform (str): Defaults to CUDA. Which OpenMM platform to simulate with (options 

793 include CUDA, CPU, OpenCL). 

794 device_ids (list[int]): Defaults to [0]. Which accelerators to use (multiple GPU  

795 support is tenuous with OpenMM). Primarily used to distribute jobs to different 

796 GPUs on a node of an HPC resource. 

797 force_constant (float): Defaults to 10.0 kcal/mol*Å^2. Force constant to use for  

798 harmonic restraints during equilibration. Currently restraints are only applied 

799 to protein backbone atoms. 

800 """ 

801 def __init__(self, 

802 path: str, 

803 custom_force_objects: list, 

804 equil_steps: int=1_250_000, 

805 prod_steps: int=250_000_000, 

806 n_equil_cycles: int=3, 

807 reporter_frequency: int=1_000, 

808 platform: str='CUDA', 

809 device_ids: list[int]=[0], 

810 equilibration_force_constant: float=10.): 

811 super().__init__(path, equil_steps, prod_steps, n_equil_cycles, 

812 reporter_frequency, platform, device_ids, 

813 equilibration_force_constant) 

814 self.custom_forces = custom_force_objects 

815 

816 def load_amber_files(self) -> System: 

817 """ 

818 Loads OpenMM system and adds in custom forces. 

819 

820 Returns: 

821 (System): OpenMM system object with custom forces. 

822 """ 

823 if not hasattr(self, 'coordinate'): 

824 self.coor_file = self.path / 'system.inpcrd' 

825 self.top_file = self.path / 'system.prmtop' 

826 self.coordinate = AmberInpcrdFile(str(self.coor_file)) 

827 self.topology = AmberPrmtopFile(str(self.top_file), 

828 periodicBoxVectors=self.coordinate.boxVectors) 

829 

830 system = self.topology.createSystem(nonbondedMethod=PME, 

831 removeCMMotion=False, 

832 nonbondedCutoff=1. * nanometer, 

833 constraints=HBonds, 

834 hydrogenMass=1.5 * amu) 

835 

836 system = self.add_forces(system) 

837 

838 return system 

839 

840 def add_forces(self, 

841 system: System) -> System: 

842 """ 

843 Systematically adds all custom forces to provided system. 

844 

845 Arguments: 

846 system (System): OpenMM system object. 

847 

848 Returns: 

849 (System): OpenMM system object with custom forces added. 

850 """ 

851 for custom_force in self.custom_forces: 

852 system.addForce(custom_force) 

853 

854 return system 

855 

856class Minimizer: 

857 """ 

858 Class for just performing energy minimization and writing out minimized structures. 

859 

860 Arguments: 

861 topology (PathLike): Topology file which can be either a PDB or an AMBER prmtop. 

862 coordinates (OptPath): Defaults to None. If using a prmtop, you need to provide 

863 an inpcrd file for the coordinates. Otherwise these are also acquired from 

864 the PDB used as topology. 

865 out (PathLike): Defaults to min.pdb. The name of the output minimized PDB file. 

866 The path is inherited from the parent path of the topology file. 

867 platform (str): Defaults to OpenCL. Which OpenMM platform to run. 

868 device_ids (list[int]): Accelerator IDs. 

869 """ 

870 def __init__(self, 

871 topology: PathLike, 

872 coordinates: PathLike, 

873 out: PathLike='min.pdb', 

874 platform: str='OpenCL', 

875 device_ids: list[int] | None =[0]): 

876 self.topology = Path(topology) 

877 self.coordinates = Path(coordinates) 

878 

879 self.path = self.topology.parent 

880 self.out = self.path / out 

881 self.platform = Platform.getPlatformByName(platform) 

882 self.properties = {'Precision': 'mixed'} 

883 

884 if device_ids is not None: 

885 self.properties.update({'DeviceIndex': ','.join([str(x) for x in device_ids])}) 

886 

887 def minimize(self) -> None: 

888 """ 

889 Loads an OpenMM system, builds simulation object and runs energy minimization. 

890 Dumps final coordinates into output PDB file. 

891 

892 Returns: 

893 None 

894 """ 

895 system = self.load_files() 

896 integrator = LangevinMiddleIntegrator(300*kelvin, 

897 1/picosecond, 

898 0.001*picoseconds) 

899 simulation = Simulation(self.topology, 

900 system, 

901 integrator, 

902 self.platform, 

903 self.properties) 

904 

905 simulation.context.setPositions(self.coordinates.positions) 

906 

907 simulation.minimizeEnergy() 

908 

909 state = simulation.context.getState(getPositions=True) 

910 positions = state.getPositions() 

911 

912 PDBFile.writeFile(self.topology.topology, 

913 positions, 

914 file=str(self.out), 

915 keepIds=True) 

916 

917 def load_files(self) -> None: 

918 """ 

919 Loads an OpenMM system depending on what file types you have provided 

920 for the topology (AMBER, PDB, etc). 

921 

922 Returns: 

923 None 

924 """ 

925 if self.topology.suffix in ['.prmtop', '.parm7']: 

926 system = self.load_amber() 

927 elif self.topology.suffix == '.top': 

928 system = self.load_gromacs() 

929 elif self.topology.suffix == '.pdb': 

930 system = self.load_pdb() 

931 else: 

932 raise FileNotFoundError('No viable simulation input files found' 

933 f'at path: {self.path}!') 

934 

935 return system 

936 

937 def load_amber(self) -> System: 

938 """ 

939 Loads AMBER input files into OpenMM System. 

940 

941 Returns: 

942 (System): OpenMM system object. 

943 """ 

944 self.coordinates = AmberInpcrdFile(str(self.coordinates)) 

945 self.topology = AmberPrmtopFile(str(self.topology), 

946 periodicBoxVectors=self.coordinates.boxVectors) 

947 

948 system = self.topology.createSystem(nonbondedMethod=NoCutoff, 

949 constraints=HBonds) 

950 

951 return system 

952 

953 def load_gromacs(self) -> System: 

954 """ 

955 Loads GROMACS input files into OpenMM System. UNTESTED! 

956 

957 Returns: 

958 (System): OpenMM system object. 

959 """ 

960 gro = list(self.path.glob('*.gro'))[0] 

961 self.coordinates = GromacsGroFile(str(gro)) 

962 self.topology = GromacsTopFile(str(self.topology), 

963 includeDir='/usr/local/gromacs/share/gromacs/top') 

964 

965 system = self.topology.createSystem(nonbondedMethod=NoCutoff, 

966 constraints=HBonds) 

967 

968 return system 

969 

970 def load_pdb(self) -> System: 

971 """ 

972 Loads PDB into OpenMM System. Beware the topology guesser. 

973 

974 Returns: 

975 (System): OpenMM system object. 

976 """ 

977 self.coordinates = PDBFile(str(self.topology)) 

978 self.topology = self.coordinates.topology 

979 forcefield = ForceField('amber14-all.xml') 

980 

981 system = forcefield.createSystem(self.topology, 

982 nonbondedMethod=NoCutoff, 

983 constraints=HBonds) 

984 

985 return system