Coverage for src / molecular_simulations / simulate / omm_simulator.py: 18%
308 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-12 10:07 -0600
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-12 10:07 -0600
1from copy import deepcopy
2import logging
3import MDAnalysis as mda
4import numpy as np
5from openmm import *
6from openmm.app import *
7from openmm.unit import *
8from openmm.app.internal.singleton import Singleton
9from pathlib import Path
10from typing import Optional, Union
12PathLike = Union[Path, str]
13OptPath = Union[Path, str, None]
15logger = logging.getLogger(__name__)
17class Simulator:
18 """
19 Class for performing OpenMM simulations on AMBER FF inputs. Inputs must conform
20 to naming conventions found below in the init.
22 Arguments:
23 path (PathLike): Path to simulation inputs, same as output path.
24 top_name (Optional[str]): Defaults to None. Optional topology file name. If not
25 provided, we will assume this is `system.prmtop`.
26 coor_name (Optional[str]): Defaults to None. Optional coordinate file name. If not
27 provided, we will assume this is `system.inpcrd`.
28 out_path (Optional[Path]): Defaults to None. Optional output path which is where we
29 will place the simulation outputs. If not provided, we will assume this is
30 `self.path`.
31 ff (str): Switch for whether or not to utilize AMBER or CHARMM ff.
32 equil_steps (int): Defaults to 1,250,000 (2.5 ns at 2 fs timestep). Number of simulation
33 timesteps to perform equilibration.
34 prod_steps (int): Defaults to 250,000,000 (1 µs at 4 fs timestep). Number of simulation
35 timesteps to perform production MD.
36 n_equil_cycles (int): Defaults to 3 cycles. Number of additional unrestrained
37 equilibration cycles to perform. Increasing this may triage unstable systems
38 at the cost of more simulation time albeit this may be negligible in the
39 grand scheme of things.
40 temperature (float): Defaults to 300.0. Temperature for simulation.
41 eq_reporter_frequency (int): Defaults to 1,000 timesteps. How often to write out to
42 the logs and trajectory files in equilibration.
43 prod_reporter_frequency (int): Defaults to 10,000 timesteps. How often to write out to
44 the logs and trajectory files in production.
45 platform (str): Defaults to CUDA. Which OpenMM platform to simulate with (options
46 include CUDA, CPU, OpenCL).
47 device_ids (list[int]): Defaults to [0]. Which accelerators to use (multiple GPU
48 support is tenuous with OpenMM). Primarily used to distribute jobs to different
49 GPUs on a node of an HPC resource.
50 force_constant (float): Defaults to 10.0 kcal/mol*Å^2. Force constant to use for
51 harmonic restraints during equilibration. Currently restraints are only applied
52 to protein backbone atoms.
53 params (Optional[str]): Optional list of CHARMM parameter files for loading from
54 psf/pdb file using CHARMM36m forcefield.
55 """
56 def __init__(self,
57 path: PathLike,
58 top_name: Optional[str]=None,
59 coor_name: Optional[str]=None,
60 out_path: Optional[Path]=None,
61 ff: str='amber',
62 equil_steps: int=1_250_000,
63 prod_steps: int=250_000_000,
64 n_equil_cycles: int=3,
65 temperature: float=300.,
66 eq_reporter_frequency: int=1_000,
67 prod_reporter_frequency: int=10_000,
68 platform: str='CUDA',
69 device_ids: list[int]=[0],
70 force_constant: float=10.,
71 params: Optional[str]=None,
72 membrane: bool=False):
73 self.path = Path(path) # enforce path object
74 self.top_file = self.path / top_name if top_name is not None else self.path / 'system.prmtop'
75 self.coor_file = self.path / coor_name if coor_name is not None else self.path / 'system.inpcrd'
76 self.temperature = temperature
78 self.ff = ff.lower()
79 self.params = params # for charmm parameter sets
80 self.setup_barostat(membrane)
82 if out_path is not None:
83 p = out_path
84 else:
85 p = path
87 # logging/checkpointing stuff
88 self.eq_state = p / 'eq.state'
89 self.eq_chkpt = p / 'eq.chk'
90 self.eq_log = p / 'eq.log'
91 self.eq_dcd = p / 'eq.dcd'
92 self.eq_freq = eq_reporter_frequency
94 self.dcd = p / 'prod.dcd'
95 self.restart = p / 'prod.rst.chk'
96 self.state = p / 'prod.state'
97 self.chkpt = p / 'prod.chk'
98 self.prod_log = p / 'prod.log'
99 self.prod_freq = prod_reporter_frequency
101 # simulation details
102 self.equil_cycles = n_equil_cycles
103 self.equil_steps = equil_steps
104 self.prod_steps = prod_steps
105 self.k = force_constant
106 self.platform = Platform.getPlatformByName(platform)
107 self.properties = {'DeviceIndex': ','.join([str(x) for x in device_ids]),
108 'Precision': 'mixed'}
110 if platform == 'CPU':
111 self.properties = {}
113 def setup_barostat(self,
114 is_membrane_system: bool) -> None:
115 """Chooses the correct barostat for the system based on whether or not there is
116 a membrane present.
118 Args:
119 is_membrane_system (bool): True if this is a membrane-containing system.
121 Returns:
122 None
123 """
124 self.barostat_args = {
125 'defaultPressure': 1 * bar,
126 }
128 if is_membrane_system:
129 self.barostat = MonteCarloMembraneBarostat
130 self.barostat_args.update({
131 'defaultSurfaceTension': 0 * bar * nm,
132 'defaultTemperature': self.temperature * kelvin,
133 'xymode': MonteCarloMembraneBarostat.XYIsotropic,
134 'zmode': MonteCarloMembraneBarostat.ZFree
135 })
136 else:
137 self.barostat = MonteCarloBarostat
138 self.barostat_args.update({
139 'temperature': self.temperature * kelvin
140 })
142 def load_system(self) -> System:
143 """Loads correct set of files based on which forcefield we have specified.
145 Raises:
146 AttributeError: If a non-valid choice of forcefield is made (not amber or charmm).
148 Returns:
149 System: OpenMM system loaded from correct files/forcefield.
150 """
151 if self.ff == 'amber':
152 system = self.load_amber_files()
153 elif self.ff == 'charmm':
154 system = self.load_charmm_files()
155 else:
156 raise AttributeError(f'self.ff must be a valid MD forcefield [amber, charmm]!')
158 if not hasattr(self, 'indices'):
159 self.indices = self.get_restraint_indices()
161 return system
163 def load_amber_files(self) -> System:
164 """Builds an OpenMM system using the prmtop/inpcrd files. PME is utilized for
165 electrostatics and a 1 nm non-bonded cutoff is used as well as 1.5 amu HMR.
167 Returns:
168 (System): OpenMM system
169 """
170 if not hasattr(self, 'coordinate'):
171 self.coordinate = AmberInpcrdFile(str(self.coor_file))
172 self.topology = AmberPrmtopFile(str(self.top_file),
173 periodicBoxVectors=self.coordinate.boxVectors)
175 system = self.topology.createSystem(nonbondedMethod=PME,
176 removeCMMotion=False,
177 nonbondedCutoff=1. * nanometer,
178 constraints=HBonds,
179 hydrogenMass=1.5 * amu)
181 return system
183 def load_charmm_files(self) -> System:
184 """Builds an OpenMM system using the psf/pdb files. PME is utilized for
185 electrostatics and a 1 nm non-bonded cutoff is used as well as 1.5 amu HMR.
187 Returns:
188 (System): OpenMM system
189 """
190 if not hasattr(self, 'coordinate'):
191 self.coordinate = PDBFile(str(self.coor_file))
192 self.topology = CharmmPsfFile(str(self.top_file),
193 periodicBoxVectors=self.coordinate.topology.getPeriodicBoxVectors())
194 if not hasattr(self, parameter_set) and self.params is not None:
195 self.parameter_set = CharmmParameterSet(*self.params)
197 if self.params is None:
198 self.forcefield = ForceField('charmm36_2024.xml', 'charmm36/water.xml')
199 system = self.forcefield.createSystem(self.coordinate.topology,
200 nonbondedMethod=PME,
201 nonbondedCutoff=1.2 * nanometer,
202 constraints=HBonds)
203 else:
204 system = self.topology.createSystem(self.parameter_set,
205 nonbondedMethod=PME,
206 nonbondedCutoff=1.2 * nanometer,
207 constraints=HBonds)
209 return system
211 def setup_sim(self,
212 system: System,
213 dt: float) -> tuple[Simulation, Integrator]:
214 """Builds OpenMM Integrator and Simulation objects utilizing a provided
215 OpenMM System object and integration timestep, dt.
217 Arguments:
218 system (System): OpenMM system to build simulation object of.
219 dt (float): Integration timestep in units of picoseconds.
221 Returns:
222 (tuple[Simulation, Integrator]): A tuple containing both the Simulation
223 and Integrator objects.
224 """
225 integrator = LangevinMiddleIntegrator(self.temperature*kelvin,
226 1/picosecond,
227 dt*picoseconds)
228 simulation = Simulation(self.topology.topology,
229 system,
230 integrator,
231 self.platform,
232 self.properties)
234 return simulation, integrator
236 def run(self) -> None:
237 """
238 Main logic of class. Determines whether or not we are restarting based on
239 if all the equilibration outputs are present. Importantly the state file
240 will not be written out until equilibration is complete. Also checks the
241 production MD log to see if we have finished, otherwise decrements our
242 progress from the total number of timesteps. Finally, runs production MD.
243 """
244 skip_eq = all([f.exists()
245 for f in [self.eq_state, self.eq_chkpt, self.eq_log]])
246 if not skip_eq:
247 logger.info('No restart detected, will begin equilibration.')
248 self.equilibrate()
249 logger.info(f'Equilibration finished, running {self.prod_steps} steps of production MD.')
251 if self.restart.exists():
252 logger.info('Checkpoint file detected, resuming simulation.')
253 self.check_num_steps_left()
254 logger.info(f'Will run {self.prod_steps} steps of production MD.')
255 self.production(chkpt=str(self.restart),
256 restart=True)
257 else:
258 self.production(chkpt=str(self.eq_chkpt),
259 restart=False)
261 logger.info('Production MD run complete.')
263 def equilibrate(self) -> Simulation:
264 """
265 Sets up and runs equilibrium MD, including energy minimization to begin.
266 Sets backbone harmonic restraints and performs a slow heating protocol
267 before periodically lessening restraints, finishing with user-specified
268 number of rounds of unrestrained equilibration.
270 Returns:
271 (Simulation): OpenMM simulation object.
272 """
273 system = self.load_system()
274 system = self.add_backbone_posres(system,
275 self.coordinate.positions,
276 self.topology.topology.atoms(),
277 self.indices,
278 self.k)
280 simulation, integrator = self.setup_sim(system, dt=0.002)
282 simulation.context.setPositions(self.coordinate.positions)
283 simulation.minimizeEnergy()
285 simulation.reporters.append(StateDataReporter(str(self.eq_log),
286 self.eq_freq,
287 step=True,
288 potentialEnergy=True,
289 speed=True,
290 temperature=True))
291 simulation.reporters.append(DCDReporter(str(self.eq_dcd), self.eq_freq))
293 simulation, integrator = self._heating(simulation, integrator)
294 simulation = self._equilibrate(simulation)
296 return simulation
298 def production(self,
299 chkpt: PathLike,
300 restart: bool=False) -> None:
301 """
302 Performs production MD. Loads a new system, integrator and simulation object
303 and loads equilibration or past production checkpoint to allow either the
304 continuation of a past simulation or to inherit the outputs of equilibration.
306 Arguments:
307 chkpt (PathLike): The checkpoint file to load. Should be either the equilibration
308 checkpoint or a past production checkpoint.
309 restart (bool): Defaults to False. Flag to ensure we log the full simulation
310 to reporter log. Otherwise restarts will overwrite the original log file.
311 """
312 system = self.load_system()
313 simulation, _ = self.setup_sim(system, dt=0.004)
315 system.addForce(self.barostat(*self.barostat_args.values()))
316 simulation.context.reinitialize(True)
318 if restart:
319 log_file = open(str(self.prod_log), 'a')
320 else:
321 log_file = str(self.prod_log)
323 simulation = self.load_checkpoint(simulation, chkpt)
324 simulation = self.attach_reporters(simulation,
325 self.dcd,
326 log_file,
327 str(self.restart),
328 restart=restart)
330 self.simulation = self._production(simulation) # save simulation object
332 def load_checkpoint(self,
333 simulation: Simulation,
334 checkpoint: PathLike) -> Simulation:
335 """
336 Loads a previous checkpoint into provided simulation object.
338 Arguments:
339 simulation (Simulation): OpenMM simulation object.
340 checkpoint (PathLike): OpenMM checkpoint file.
342 Returns:
343 (Simulation): OpenMM simulation object that has been checkpointed.
344 """
345 simulation.loadCheckpoint(checkpoint)
346 state = simulation.context.getState(getVelocities=True, getPositions=True)
347 positions = state.getPositions()
348 velocities = state.getVelocities()
350 simulation.context.setPositions(positions)
351 simulation.context.setVelocities(velocities)
353 return simulation
355 def attach_reporters(self,
356 simulation: Simulation,
357 dcd_file: PathLike,
358 log_file: PathLike,
359 rst_file: PathLike,
360 restart: bool=False) -> Simulation:
361 """
362 Attaches a StateDataReporter for logging, CheckpointReporter to output
363 periodic checkpoints and a DCDReporter to output trajectory data to
364 simulation object.
366 Arguments:
367 simulation (Simulation): OpenMM simulation object.
368 dcd_file (PathLike): DCD file to write to.
369 log_file (PathLike): Log file to write to.
370 rst_file (PathLike): Checkpoint file to write to.
371 restart (bool): Defaults to False. Whether or not we should be appending
372 to an existing DCD file or writing a new one, potentially overwriting
373 an existing DCD (if false).
375 Returns:
376 (Simulation): OpenMM simulation object with reporters now attached.
377 """
378 simulation.reporters.extend([
379 DCDReporter(
380 dcd_file,
381 self.prod_freq,
382 append=restart
383 ),
384 StateDataReporter(
385 log_file,
386 self.prod_freq,
387 step=True,
388 potentialEnergy=True,
389 temperature=True,
390 progress=True,
391 remainingTime=True,
392 speed=True,
393 volume=True,
394 totalSteps=self.prod_steps,
395 separator='\t'
396 ),
397 CheckpointReporter(
398 rst_file,
399 self.prod_freq * 10
400 )
401 ])
403 return simulation
405 def _heating(self,
406 simulation: Simulation,
407 integrator: Integrator) -> tuple[Simulation, Integrator]:
408 """
409 Slow heating protocol. Seeds velocities at 5 kelvin to begin, and
410 over a period of 100,000 timesteps gradually heats up to 300 kelvin
411 in 1,000 discrete jumps. This should generalize to most systems but
412 if your system requires more control these hard-coded values will need
413 to be changed here.
415 Arguments:
416 simulation (Simulation): OpenMM simulation object.
417 integrator (Integrator): OpenMM integrator object.
419 Returns:
420 (tuple[Simulation, Integrator]): Tuple containing simulation and
421 integrator objects after heating has been performed.
422 """
423 simulation.context.setVelocitiesToTemperature(5*kelvin)
424 T = 5
426 integrator.setTemperature(T * kelvin)
427 mdsteps = 100000
428 heat_steps = 1000
429 length = mdsteps // heat_steps
430 tstep = (self.temperature - T) / length
431 for i in range(length):
432 simulation.step(heat_steps)
433 temp = T + tstep * (1 + i)
435 if temp > self.temperature:
436 temp = self.temperature
438 integrator.setTemperature(temp * kelvin)
440 return simulation, integrator
442 def _equilibrate(self,
443 simulation: Simulation) -> Simulation:
444 """
445 Equilibration procotol.
446 (1) Performs a 5-step restraint relaxation in NVT ensemble wherein
447 each step 1/5 of the restraint is relaxed until there are no harmonic
448 restraints. Length of each step is also 1/5 of the total equil_steps
449 set by the user.
450 (2) One step-length of unrestrained NVT is performed before turning on
451 barostat for NPT.
452 (3) `equil_cycles` number of step-lengths are then ran with NPT before
453 saving out the state and checkpoints.
455 Arguments:
456 simulation (Simulation): OpenMM simulation object.
458 Returns:
459 (Simulation): Equilibrated OpenMM simulation object.
460 """
461 simulation.context.reinitialize(True)
462 n_levels = 5
463 d_k = self.k / n_levels
464 eq_steps = self.equil_steps // n_levels
466 for i in range(n_levels):
467 simulation.step(eq_steps)
468 k = float(self.k - (i * d_k))
469 simulation.context.setParameter('k', (k * kilocalories_per_mole/angstroms**2))
471 simulation.context.setParameter('k', 0)
472 simulation.step(eq_steps)
474 simulation.system.addForce(self.barostat(*self.barostat_args.values()))
475 simulation.step(self.equil_cycles * eq_steps)
477 simulation.saveState(str(self.eq_state))
478 simulation.saveCheckpoint(str(self.eq_chkpt))
480 return simulation
482 def _production(self,
483 simulation: Simulation) -> Simulation:
484 """
485 Production MD procotol. Runs for specified number of timesteps before
486 saving out a final state and checkpoint file.
488 Arguments:
489 simulation (Simulation): OpenMM simulation object.
491 Returns:
492 (Simulation): OpenMM simulation object.
493 """
494 simulation.step(self.prod_steps)
495 simulation.saveState(str(self.state))
496 simulation.saveCheckpoint(str(self.chkpt))
498 return simulation
500 def get_restraint_indices(self,
501 addtl_selection: str='') -> list[int]:
502 """
503 Obtains atom indices that will be used to set harmonic restraints. First
504 loads an MDAnalysis universe with the input prmtop and inpcrd files. Uses
505 a base selection of protein or nucleic acid backbone but if provided an
506 additional selection can be included for things like restraining ligand
507 molecules.
509 Arguments:
510 addtl_selection (str): Defaults to empty string. If provided, will
511 be used to define additional atoms for restraining.
513 Returns:
514 (list[int]): List of atomic indices for atoms to be restrained.
515 """
516 u = mda.Universe(str(self.top_file), str(self.coor_file))
517 if addtl_selection:
518 sel = u.select_atoms(f'backbone or nucleicbackbone or {addtl_selection}')
519 else:
520 sel = u.select_atoms('backbone or nucleicbackbone')
522 return sel.atoms.ix
524 def check_num_steps_left(self) -> None:
525 """
526 Reads the production log file to see if we have completed a simulation.
527 If there is still simulation to be completed, decrements the existing progress
528 from internal number of steps to perform. Additionally, accounts for any frames
529 that have been written to DCD that are not accounted for in the log.
531 Returns:
532 None
533 """
534 prod_log = open(str(self.prod_log)).readlines()
536 try:
537 last_line = prod_log[-1]
538 last_step = int(last_line.split()[1].strip())
539 except IndexError:
540 try:
541 last_line = prod_log[-2]
542 last_step = int(last_line.split()[1].strip())
543 except IndexError: # something weird happend just run full time
544 return
546 if time_left := (self.prod_steps - last_step):
547 self.prod_steps -= time_left
549 if n_repeat_timesteps := (last_step % (self.prod_freq * 10)):
550 self.prod_steps -= n_repeat_timesteps
551 n_repeat_frames = n_repeat_timesteps / self.prod_freq
553 n_total_frames = last_step / self.prod_freq
555 lines = [f'{n_total_frames - n_repeat_frames},{n_total_frames}']
556 duplicate_log = self.path / 'duplicate_frames.log'
557 if duplicate_log.exists():
558 mode = 'a'
559 else:
560 mode = 'w'
561 lines = ['first_frame,last_frame'] + lines
563 with open(str(duplicate_log), mode) as fout:
564 fout.write('\n'.join(lines))
566 @staticmethod
567 def add_backbone_posres(system: System,
568 positions: np.ndarray,
569 atoms: list[topology.Atom],
570 indices: list[int],
571 restraint_force: float=10.) -> System:
572 """
573 Adds harmonic restraints to an OpenMM system.
575 Arguments:
576 system (System): OpenMM system object.
577 positions (np.ndarray): Position array for all atoms in system.
578 atoms (list[topology.Atom]): List of all Atom objects in system.
579 indices (list[int]): List of atomic indices to restrain.
580 restraint_force (float): Defaults to 10.0 kcal/mol*Å^2. The force
581 constant to use for harmonic restraints.
583 Returns:
584 (System): OpenMM system with harmonic restraints.
585 """
586 force = CustomExternalForce("k*periodicdistance(x, y, z, x0, y0, z0)^2")
588 force_amount = restraint_force * kilocalories_per_mole/angstroms**2
589 force.addGlobalParameter("k", force_amount)
590 force.addPerParticleParameter("x0")
591 force.addPerParticleParameter("y0")
592 force.addPerParticleParameter("z0")
594 for i, (atom_crd, atom) in enumerate(zip(positions, atoms)):
595 if atom.index in indices:
596 force.addParticle(i, atom_crd.value_in_unit(nanometers))
598 posres_sys = deepcopy(system)
599 posres_sys.addForce(force)
601 return posres_sys
603class ImplicitSimulator(Simulator):
604 """
605 Implicit solvent simulator. Inherits from Simulator and overloads relevant
606 methods to support the slight differences in how implicit solvent is implemented
607 in OpenMM.
609 Arguments:
610 path (PathLike): Path to simulation inputs, same as output path.
611 top_name (Optional[str]): Defaults to None. Optional topology file name. If not
612 provided, we will assume this is `system.prmtop`.
613 coor_name (Optional[str]): Defaults to None. Optional coordinate file name. If not
614 provided, we will assume this is `system.inpcrd`.
615 out_path (Optional[Path]): Defaults to None. Optional output path which is where we
616 will place the simulation outputs. If not provided, we will assume this is
617 `self.path`.
618 ff (str): Switch for whether or not to utilize AMBER or CHARMM ff.
619 equil_steps (int): Defaults to 1,250,000 (2.5 ns at 2 fs timestep). Number of simulation
620 timesteps to perform equilibration.
621 prod_steps (int): Defaults to 250,000,000 (1 µs at 4 fs timestep). Number of simulation
622 timesteps to perform production MD.
623 n_equil_cycles (int): Defaults to 3 cycles. Number of additional unrestrained
624 equilibration cycles to perform. Increasing this may triage unstable systems
625 at the cost of more simulation time albeit this may be negligible in the
626 grand scheme of things.
627 temperature (float): Defaults to 300.0. Temperature for simulation.
628 eq_reporter_frequency (int): Defaults to 1,000 timesteps. How often to write out to
629 the logs and trajectory files in equilibration.
630 prod_reporter_frequency (int): Defaults to 10,000 timesteps. How often to write out to
631 the logs and trajectory files in production.
632 platform (str): Defaults to CUDA. Which OpenMM platform to simulate with (options
633 include CUDA, CPU, OpenCL).
634 device_ids (list[int]): Defaults to [0]. Which accelerators to use (multiple GPU
635 support is tenuous with OpenMM). Primarily used to distribute jobs to different
636 GPUs on a node of an HPC resource.
637 force_constant (float): Defaults to 10.0 kcal/mol*Å^2. Force constant to use for
638 harmonic restraints during equilibration. Currently restraints are only applied
639 to protein backbone atoms.
640 implicit_solvent (Singleton): Defaults to GBn2. Which implicit solvent model to use.
641 solute_dielectric (float): Defaults to 1.0. Probably shouldn't change this.
642 solvent_dielectric (float): Defaults to 78.5. Also shouldn't change this.
643 """
644 def __init__(self,
645 path: str,
646 top_name: Optional[str]=None,
647 coor_name: Optional[str]=None,
648 out_path: Optional[Path]=None,
649 ff: str = 'amber',
650 equil_steps: int=1_250_000,
651 prod_steps: int=250_000_000,
652 n_equil_cycles: int=3,
653 temperature: float=300.,
654 eq_reporter_frequency: int=1_000,
655 prod_reporter_frequency: int=10_000,
656 platform: str='CUDA',
657 device_ids: list[int]=[0],
658 force_constant: float=10.,
659 implicit_solvent: Singleton=GBn2,
660 solute_dielectric: float=1.,
661 solvent_dielectric: float=78.5,
662 **kwargs):
663 super().__init__(path=path, top_name=top_name,
664 coor_name=coor_name, out_path=out_path,
665 ff=ff, equil_steps=equil_steps,
666 prod_steps=prod_steps,
667 n_equil_cycles=n_equil_cycles,
668 temperature=temperature,
669 eq_reporter_frequency=eq_reporter_frequency,
670 prod_reporter_frequency=prod_reporter_frequency,
671 platform=platform, device_ids=device_ids,
672 force_constant=force_constant)
673 self.solvent = implicit_solvent
674 self.solute_dielectric = solute_dielectric
675 self.solvent_dielectric = solvent_dielectric
676 # solvent screening parameter for 150mM ions
677 # k = 367.434915 * sqrt(conc. [M] / (solvent_dielectric * T))
678 self.kappa = 367.434915 * np.sqrt(.15 / (solvent_dielectric * 300))
680 def load_amber_files(self) -> System:
681 """
682 Loads an OpenMM system object with implicit solvent parameters.
684 Returns:
685 (System): OpenMM system object.
686 """
687 if not hasattr(self, 'coordinate'):
688 self.coordinate = AmberInpcrdFile(str(self.coor_file))
689 self.topology = AmberPrmtopFile(str(self.top_file))
691 system = self.topology.createSystem(nonbondedMethod=NoCutoff,
692 removeCMMotion=False,
693 constraints=HBonds,
694 hydrogenMass=1.5 * amu,
695 implicitSolvent=self.solvent,
696 soluteDielectric=self.solute_dielectric,
697 solventDielectric=self.solvent_dielectric,
698 implicitSolventKappa=self.kappa/nanometer)
700 return system
702 def equilibrate(self) -> Simulation:
703 """
704 Runs reduced equilibration protocol. Due to the faster convergence of
705 using implicit solvent we don't need to be quite as rigorous as with
706 explicit solvent system relaxation.
708 Returns:
709 (Simulation): OpenMM simulation object.
710 """
711 system = self.load_system()
712 system = self.add_backbone_posres(system,
713 self.coordinate.positions,
714 self.topology.topology.atoms(),
715 self.indices,
716 self.k)
718 simulation, integrator = self.setup_sim(system, dt=0.002)
720 simulation.context.setPositions(self.coordinate.positions)
721 state = simulation.context.getState(getEnergy=True)
722 print(f'Energy before minimization: {state.getPotentialEnergy()}')
723 simulation.minimizeEnergy()
724 state = simulation.context.getState(getEnergy=True)
725 print(f'Energy after minimization: {state.getPotentialEnergy()}')
727 simulation.reporters.append(StateDataReporter(str(self.eq_log),
728 self.eq_freq,
729 step=True,
730 potentialEnergy=True,
731 speed=True,
732 temperature=True))
733 simulation.reporters.append(DCDReporter(str(self.eq_dcd), self.eq_freq))
735 simulation, integrator = self._heating(simulation, integrator)
736 simulation = self._equilibrate(simulation)
738 return simulation
740 def production(self,
741 chkpt: PathLike,
742 restart: bool=False) -> None:
743 """
744 Performs production MD. Loads a new system, integrator and simulation object
745 and loads equilibration or past production checkpoint to allow either the
746 continuation of a past simulation or to inherit the outputs of equilibration.
748 Arguments:
749 chkpt (PathLike): The checkpoint file to load. Should be either the equilibration
750 checkpoint or a past production checkpoint.
751 restart (bool): Defaults to False. Flag to ensure we log the full simulation
752 to reporter log. Otherwise restarts will overwrite the original log file.
753 """
754 system = self.load_system()
755 simulation, _ = self.setup_sim(system, dt=0.004)
757 simulation.context.reinitialize(True)
759 if restart:
760 log_file = open(str(self.prod_log), 'a')
761 else:
762 log_file = str(self.prod_log)
764 simulation = self.load_checkpoint(simulation, chkpt)
765 simulation = self.attach_reporters(simulation,
766 self.dcd,
767 log_file,
768 str(self.restart),
769 restart=restart)
771 self.simulation = self._production(simulation) # save simulation object
773class CustomForcesSimulator(Simulator):
774 """
775 Simulator for utilizing custom user-defined forces. Inherits from Simulator while
776 providing a way to inject custom forces.
778 Arguments:
779 path (PathLike): Path to simulation inputs, same as output path.
780 custom_force_objects (list): ??
781 equil_steps (int): Defaults to 1,250,000 (2.5 ns). Number of simulation timesteps to
782 perform equilibration for (2fs timestep).
783 prod_steps (int): Defaults to 250,000,000 (1 µs). Number of simulation timesteps to
784 perform production MD for (4fs timestep).
785 n_equil_cycles (int): Defaults to 3 cycles. Number of additional unrestrained
786 equilibration cycles to perform. Increasing this may triage unstable systems
787 at the cost of more simulation time albeit this may be negligible in the
788 grand scheme of things.
789 reporter_frequency (int): Defaults to 1,000 timesteps. How often to write out to
790 the logs and trajectory files in equilibration. X times less frequently during
791 production MD.
792 platform (str): Defaults to CUDA. Which OpenMM platform to simulate with (options
793 include CUDA, CPU, OpenCL).
794 device_ids (list[int]): Defaults to [0]. Which accelerators to use (multiple GPU
795 support is tenuous with OpenMM). Primarily used to distribute jobs to different
796 GPUs on a node of an HPC resource.
797 force_constant (float): Defaults to 10.0 kcal/mol*Å^2. Force constant to use for
798 harmonic restraints during equilibration. Currently restraints are only applied
799 to protein backbone atoms.
800 """
801 def __init__(self,
802 path: str,
803 custom_force_objects: list,
804 equil_steps: int=1_250_000,
805 prod_steps: int=250_000_000,
806 n_equil_cycles: int=3,
807 reporter_frequency: int=1_000,
808 platform: str='CUDA',
809 device_ids: list[int]=[0],
810 equilibration_force_constant: float=10.):
811 super().__init__(path, equil_steps, prod_steps, n_equil_cycles,
812 reporter_frequency, platform, device_ids,
813 equilibration_force_constant)
814 self.custom_forces = custom_force_objects
816 def load_amber_files(self) -> System:
817 """
818 Loads OpenMM system and adds in custom forces.
820 Returns:
821 (System): OpenMM system object with custom forces.
822 """
823 if not hasattr(self, 'coordinate'):
824 self.coor_file = self.path / 'system.inpcrd'
825 self.top_file = self.path / 'system.prmtop'
826 self.coordinate = AmberInpcrdFile(str(self.coor_file))
827 self.topology = AmberPrmtopFile(str(self.top_file),
828 periodicBoxVectors=self.coordinate.boxVectors)
830 system = self.topology.createSystem(nonbondedMethod=PME,
831 removeCMMotion=False,
832 nonbondedCutoff=1. * nanometer,
833 constraints=HBonds,
834 hydrogenMass=1.5 * amu)
836 system = self.add_forces(system)
838 return system
840 def add_forces(self,
841 system: System) -> System:
842 """
843 Systematically adds all custom forces to provided system.
845 Arguments:
846 system (System): OpenMM system object.
848 Returns:
849 (System): OpenMM system object with custom forces added.
850 """
851 for custom_force in self.custom_forces:
852 system.addForce(custom_force)
854 return system
856class Minimizer:
857 """
858 Class for just performing energy minimization and writing out minimized structures.
860 Arguments:
861 topology (PathLike): Topology file which can be either a PDB or an AMBER prmtop.
862 coordinates (OptPath): Defaults to None. If using a prmtop, you need to provide
863 an inpcrd file for the coordinates. Otherwise these are also acquired from
864 the PDB used as topology.
865 out (PathLike): Defaults to min.pdb. The name of the output minimized PDB file.
866 The path is inherited from the parent path of the topology file.
867 platform (str): Defaults to OpenCL. Which OpenMM platform to run.
868 device_ids (list[int]): Accelerator IDs.
869 """
870 def __init__(self,
871 topology: PathLike,
872 coordinates: PathLike,
873 out: PathLike='min.pdb',
874 platform: str='OpenCL',
875 device_ids: list[int] | None =[0]):
876 self.topology = Path(topology)
877 self.coordinates = Path(coordinates)
879 self.path = self.topology.parent
880 self.out = self.path / out
881 self.platform = Platform.getPlatformByName(platform)
882 self.properties = {'Precision': 'mixed'}
884 if device_ids is not None:
885 self.properties.update({'DeviceIndex': ','.join([str(x) for x in device_ids])})
887 def minimize(self) -> None:
888 """
889 Loads an OpenMM system, builds simulation object and runs energy minimization.
890 Dumps final coordinates into output PDB file.
892 Returns:
893 None
894 """
895 system = self.load_files()
896 integrator = LangevinMiddleIntegrator(300*kelvin,
897 1/picosecond,
898 0.001*picoseconds)
899 simulation = Simulation(self.topology,
900 system,
901 integrator,
902 self.platform,
903 self.properties)
905 simulation.context.setPositions(self.coordinates.positions)
907 simulation.minimizeEnergy()
909 state = simulation.context.getState(getPositions=True)
910 positions = state.getPositions()
912 PDBFile.writeFile(self.topology.topology,
913 positions,
914 file=str(self.out),
915 keepIds=True)
917 def load_files(self) -> None:
918 """
919 Loads an OpenMM system depending on what file types you have provided
920 for the topology (AMBER, PDB, etc).
922 Returns:
923 None
924 """
925 if self.topology.suffix in ['.prmtop', '.parm7']:
926 system = self.load_amber()
927 elif self.topology.suffix == '.top':
928 system = self.load_gromacs()
929 elif self.topology.suffix == '.pdb':
930 system = self.load_pdb()
931 else:
932 raise FileNotFoundError('No viable simulation input files found'
933 f'at path: {self.path}!')
935 return system
937 def load_amber(self) -> System:
938 """
939 Loads AMBER input files into OpenMM System.
941 Returns:
942 (System): OpenMM system object.
943 """
944 self.coordinates = AmberInpcrdFile(str(self.coordinates))
945 self.topology = AmberPrmtopFile(str(self.topology),
946 periodicBoxVectors=self.coordinates.boxVectors)
948 system = self.topology.createSystem(nonbondedMethod=NoCutoff,
949 constraints=HBonds)
951 return system
953 def load_gromacs(self) -> System:
954 """
955 Loads GROMACS input files into OpenMM System. UNTESTED!
957 Returns:
958 (System): OpenMM system object.
959 """
960 gro = list(self.path.glob('*.gro'))[0]
961 self.coordinates = GromacsGroFile(str(gro))
962 self.topology = GromacsTopFile(str(self.topology),
963 includeDir='/usr/local/gromacs/share/gromacs/top')
965 system = self.topology.createSystem(nonbondedMethod=NoCutoff,
966 constraints=HBonds)
968 return system
970 def load_pdb(self) -> System:
971 """
972 Loads PDB into OpenMM System. Beware the topology guesser.
974 Returns:
975 (System): OpenMM system object.
976 """
977 self.coordinates = PDBFile(str(self.topology))
978 self.topology = self.coordinates.topology
979 forcefield = ForceField('amber14-all.xml')
981 system = forcefield.createSystem(self.topology,
982 nonbondedMethod=NoCutoff,
983 constraints=HBonds)
985 return system