Coverage for src / molecular_simulations / utils / parsl_settings.py: 100%
60 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-13 01:26 -0600
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-13 01:26 -0600
1#!/usr/bin/env python
2from abc import ABC, abstractmethod
3import json
4from parsl.config import Config
5from parsl.providers import LocalProvider, PBSProProvider
6from parsl.executors import HighThroughputExecutor
7from parsl.launchers import MpiExecLauncher, GnuParallelLauncher
8from parsl.addresses import address_by_interface, address_by_hostname
9from parsl.utils import get_all_checkpoints
10from pathlib import Path
11from pydantic import BaseModel
12from typing import List, Sequence, Tuple, Type, TypeVar, Union
13import yaml
15PathLike = Union[str, Path]
16_T = TypeVar("_T")
18class BaseSettings(BaseModel):
19 def dump_yaml(self, filename: PathLike) -> None:
20 with open(filename, mode="w") as fp:
21 yaml.dump(json.loads(self.json()), fp, indent=4, sort_keys=False)
23 @classmethod
24 def from_yaml(cls: Type[_T], filename: PathLike) -> _T:
25 with open(filename) as fp:
26 raw_data = yaml.safe_load(fp)
27 return cls(**raw_data) # type: ignore
29class BaseComputeSettings(ABC, BaseSettings):
30 """Compute settings (HPC platform, number of GPUs, etc)."""
32 @abstractmethod
33 def config_factory(self, run_dir: PathLike) -> Config:
34 """
35 Create new Parsl configuration.
36 """
38class LocalSettings(BaseComputeSettings):
39 available_accelerators: Union[int, Sequence[str]] = 4
40 retries: int = 1
41 label: str = 'htex'
42 worker_port_range: Tuple[int, int] = (10000, 20000)
44 def config_factory(self) -> Config:
45 return Config(
46 run_dir=str(run_dir),
47 retries=self.retries,
48 executors=[
49 HighThroughputExecutor(
50 address="127.0.0.1",
51 label=self.label,
52 cpu_affinity="block",
53 available_accelerators=self.available_accelerators,
54 worker_port_range=self.worker_port_range,
55 provider=LocalProvider(init_blocks=1, max_blocks=1)
56 ),
57 ],
58 )
60class PolarisSettings(BaseComputeSettings):
61 label: str = 'htex'
62 num_nodes: int = 1
63 worker_init: str = ''
64 scheduler_options: str = ''
65 account: str
66 queue: str
67 walltime: str
68 cpus_per_node: int = 64
69 strategy: str = 'simple'
70 available_accelerators: Union[int, Sequence[str]] = 4
72 def config_factory(self, run_dir: PathLike) -> Config:
73 """Create a configuration suitable for running all tasks on single nodes of Polaris
74 We will launch 4 workers per node, each pinned to a different GPU
75 Args:
76 num_nodes: Number of nodes to use for the MPI parallel tasks
77 user_options: Options for which account to use, location of environment files, etc
78 run_dir: Directory in which to store Parsl run files. Default: `runinfo`
79 """
80 return Config(
81 retries=1, # Allows restarts if jobs are killed by the end of a job
82 executors=[
83 HighThroughputExecutor(
84 label=self.label,
85 heartbeat_period=15,
86 heartbeat_threshold=120,
87 worker_debug=True,
88 available_accelerators=self.available_accelerators,
89 address=address_by_hostname(),
90 cpu_affinity="alternating",
91 prefetch_capacity=0, # Increase if you have many more tasks than workers
92 provider=PBSProProvider( # type: ignore[no-untyped-call]
93 launcher=MpiExecLauncher(
94 bind_cmd="--cpu-bind", overrides="--depth=64 --ppn 1"
95 ), # Updates to the mpiexec command
96 account=self.account,
97 queue=self.queue,
98 select_options="ngpus=4",
99 # PBS directives (header lines): for array jobs pass '-J' option
100 scheduler_options=self.scheduler_options,
101 worker_init=self.worker_init,
102 nodes_per_block=self.num_nodes,
103 init_blocks=1,
104 min_blocks=0,
105 max_blocks=1, # Can increase more to have more parallel jobs
106 cpus_per_node=self.cpus_per_node,
107 walltime=self.walltime,
108 ),
109 ),
110 ],
111 run_dir=str(run_dir),
112 strategy=self.strategy,
113 app_cache=True,
114 )
116class AuroraSettings(BaseComputeSettings):
117 label: str = 'htex'
118 worker_init: str = ""
119 num_nodes: int = 1
120 scheduler_options: str = ""
121 account: str
122 queue: str
123 walltime: str
124 retries: int = 0
125 cpus_per_node: int = 48 # only 4 cpus per OpenMM job
126 strategy: str = "simple"
127 available_accelerators: List[str] = [str(i) for i in range(12)]
129 def config_factory(self, run_dir: PathLike) -> Config:
130 """Create a Parsl configuration for running on Aurora."""
131 return Config(
132 executors=[
133 HighThroughputExecutor(
134 label=self.label,
135 available_accelerators=self.available_accelerators,
136 cpu_affinity="block", # Assigns cpus in sequential order
137 prefetch_capacity=0,
138 max_workers=12,
139 cores_per_worker=16,
140 heartbeat_period=30,
141 heartbeat_threshold=300,
142 worker_debug=False,
143 provider=PBSProProvider(
144 launcher=MpiExecLauncher(
145 bind_cmd="--cpu-bind",
146 overrides=f"--depth=208 --ppn 1"
147 ), # Ensures 1 manger per node and allows it to divide work among all 208 threads
148 worker_init=self.worker_init,
149 nodes_per_block=self.num_nodes,
150 account=self.account,
151 queue=self.queue,
152 walltime=self.walltime,
154 ),
155 ),
156 ],
157 run_dir=str(run_dir),
158 checkpoint_mode='task_exit',
159 retries=self.retries,
160 app_cache=True,
161 )