Coverage for src / molecular_simulations / utils / parsl_settings.py: 100%

60 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-13 01:26 -0600

1#!/usr/bin/env python 

2from abc import ABC, abstractmethod 

3import json 

4from parsl.config import Config 

5from parsl.providers import LocalProvider, PBSProProvider 

6from parsl.executors import HighThroughputExecutor 

7from parsl.launchers import MpiExecLauncher, GnuParallelLauncher 

8from parsl.addresses import address_by_interface, address_by_hostname 

9from parsl.utils import get_all_checkpoints 

10from pathlib import Path 

11from pydantic import BaseModel 

12from typing import List, Sequence, Tuple, Type, TypeVar, Union 

13import yaml 

14 

15PathLike = Union[str, Path] 

16_T = TypeVar("_T") 

17 

18class BaseSettings(BaseModel): 

19 def dump_yaml(self, filename: PathLike) -> None: 

20 with open(filename, mode="w") as fp: 

21 yaml.dump(json.loads(self.json()), fp, indent=4, sort_keys=False) 

22 

23 @classmethod 

24 def from_yaml(cls: Type[_T], filename: PathLike) -> _T: 

25 with open(filename) as fp: 

26 raw_data = yaml.safe_load(fp) 

27 return cls(**raw_data) # type: ignore 

28 

29class BaseComputeSettings(ABC, BaseSettings): 

30 """Compute settings (HPC platform, number of GPUs, etc).""" 

31 

32 @abstractmethod 

33 def config_factory(self, run_dir: PathLike) -> Config: 

34 """ 

35 Create new Parsl configuration. 

36 """ 

37 

38class LocalSettings(BaseComputeSettings): 

39 available_accelerators: Union[int, Sequence[str]] = 4 

40 retries: int = 1 

41 label: str = 'htex' 

42 worker_port_range: Tuple[int, int] = (10000, 20000) 

43 

44 def config_factory(self) -> Config: 

45 return Config( 

46 run_dir=str(run_dir), 

47 retries=self.retries, 

48 executors=[ 

49 HighThroughputExecutor( 

50 address="127.0.0.1", 

51 label=self.label, 

52 cpu_affinity="block", 

53 available_accelerators=self.available_accelerators, 

54 worker_port_range=self.worker_port_range, 

55 provider=LocalProvider(init_blocks=1, max_blocks=1) 

56 ), 

57 ], 

58 ) 

59 

60class PolarisSettings(BaseComputeSettings): 

61 label: str = 'htex' 

62 num_nodes: int = 1 

63 worker_init: str = '' 

64 scheduler_options: str = '' 

65 account: str 

66 queue: str 

67 walltime: str 

68 cpus_per_node: int = 64 

69 strategy: str = 'simple' 

70 available_accelerators: Union[int, Sequence[str]] = 4 

71 

72 def config_factory(self, run_dir: PathLike) -> Config: 

73 """Create a configuration suitable for running all tasks on single nodes of Polaris 

74 We will launch 4 workers per node, each pinned to a different GPU 

75 Args: 

76 num_nodes: Number of nodes to use for the MPI parallel tasks 

77 user_options: Options for which account to use, location of environment files, etc 

78 run_dir: Directory in which to store Parsl run files. Default: `runinfo` 

79 """ 

80 return Config( 

81 retries=1, # Allows restarts if jobs are killed by the end of a job 

82 executors=[ 

83 HighThroughputExecutor( 

84 label=self.label, 

85 heartbeat_period=15, 

86 heartbeat_threshold=120, 

87 worker_debug=True, 

88 available_accelerators=self.available_accelerators, 

89 address=address_by_hostname(), 

90 cpu_affinity="alternating", 

91 prefetch_capacity=0, # Increase if you have many more tasks than workers 

92 provider=PBSProProvider( # type: ignore[no-untyped-call] 

93 launcher=MpiExecLauncher( 

94 bind_cmd="--cpu-bind", overrides="--depth=64 --ppn 1" 

95 ), # Updates to the mpiexec command 

96 account=self.account, 

97 queue=self.queue, 

98 select_options="ngpus=4", 

99 # PBS directives (header lines): for array jobs pass '-J' option 

100 scheduler_options=self.scheduler_options, 

101 worker_init=self.worker_init, 

102 nodes_per_block=self.num_nodes, 

103 init_blocks=1, 

104 min_blocks=0, 

105 max_blocks=1, # Can increase more to have more parallel jobs 

106 cpus_per_node=self.cpus_per_node, 

107 walltime=self.walltime, 

108 ), 

109 ), 

110 ], 

111 run_dir=str(run_dir), 

112 strategy=self.strategy, 

113 app_cache=True, 

114 ) 

115 

116class AuroraSettings(BaseComputeSettings): 

117 label: str = 'htex' 

118 worker_init: str = "" 

119 num_nodes: int = 1 

120 scheduler_options: str = "" 

121 account: str 

122 queue: str 

123 walltime: str 

124 retries: int = 0 

125 cpus_per_node: int = 48 # only 4 cpus per OpenMM job 

126 strategy: str = "simple" 

127 available_accelerators: List[str] = [str(i) for i in range(12)] 

128 

129 def config_factory(self, run_dir: PathLike) -> Config: 

130 """Create a Parsl configuration for running on Aurora.""" 

131 return Config( 

132 executors=[ 

133 HighThroughputExecutor( 

134 label=self.label, 

135 available_accelerators=self.available_accelerators, 

136 cpu_affinity="block", # Assigns cpus in sequential order 

137 prefetch_capacity=0, 

138 max_workers=12, 

139 cores_per_worker=16, 

140 heartbeat_period=30, 

141 heartbeat_threshold=300, 

142 worker_debug=False, 

143 provider=PBSProProvider( 

144 launcher=MpiExecLauncher( 

145 bind_cmd="--cpu-bind", 

146 overrides=f"--depth=208 --ppn 1" 

147 ), # Ensures 1 manger per node and allows it to divide work among all 208 threads 

148 worker_init=self.worker_init, 

149 nodes_per_block=self.num_nodes, 

150 account=self.account, 

151 queue=self.queue, 

152 walltime=self.walltime, 

153 

154 ), 

155 ), 

156 ], 

157 run_dir=str(run_dir), 

158 checkpoint_mode='task_exit', 

159 retries=self.retries, 

160 app_cache=True, 

161 ) 

162