Introduction||What is DDP||Single-Node Multi-GPU Training||Fault Tolerance||Multi-Node training||minGPT TrainingMulti GPU training with DDP¶Authors:Suraj SubramanianWhat you will learnHow to migrate a single-GPU training script to multi-GPU via DDPSetting up the distributed process groupSaving and loading models in a distributed setupView the code used in this tutorial onGitHubPrerequisitesHigh-level overview ofhow DDP worksA machine with multiple GPUs (this tutorial uses an AWS p3.8xlarge instance)PyTorchinstalledwith CUDAFollow along with the video below or onyoutube.In theprevious tutorial, we got a high-level overview of how DDP works; now we see how to use DDP in code.
In this tutorial, we start with a single-GPU training script and migrate that to running it on 4 GPUs on a single node.
Along the way, we will talk through important concepts in distributed training while implementing them in our code.참고If your model contains anyBatchNormlayers, it needs to be converted toSyncBatchNormto sync the running stats ofBatchNormlayers across replicas.Use the helper functiontorch.nn.SyncBatchNorm.convert_sync_batchnorm(model)to convert allBatchNormlayers in the model toSyncBatchNorm.Diff forsingle_gpu.pyv/smultigpu.pyThese are the changes you typically make to a single-GPU training script to enable DDP.Imports¶torch.multiprocessingis a PyTorch wrapper around Python’s native
multiprocessingThe distributed process group contains all the processes that can
communicate and synchronize with each other.import torch
import torch.nn.functional as F
from utils import MyTrainDataset+ import torch.multiprocessing as mp+ from torch.utils.data.distributed import DistributedSampler+ from torch.nn.parallel import DistributedDataParallel as DDP+ from torch.distributed import init_process_group, destroy_process_group+ import osConstructing the process group¶First, before initializing the group process, callset_device,
which sets the default GPU for each process. This is important to prevent hangs or excessive memory utilization onGPU:0The process group can be initialized by TCP (default) or from a
shared file-system. Read more onprocess group
initializationinit_process_groupinitializes the distributed process group.Read more aboutchoosing a DDP
backend+ def ddp_setup(rank: int, world_size: int):+   """+   Args:+       rank: Unique identifier of each process+      world_size: Total number of processes+   """+   os.environ["MASTER_ADDR"] = "localhost"+   os.environ["MASTER_PORT"] = "12355"+   torch.cuda.set_device(rank)+   init_process_group(backend="nccl", rank=rank, world_size=world_size)Constructing the DDP model¶- self.model = model.to(gpu_id)+ self.model = DDP(model, device_ids=[gpu_id])Distributing input data¶DistributedSamplerchunks the input data across all distributed processes.Each process will receive an input batch of 32 samples; the effective
batch size is32*nprocs, or 128 when using 4 GPUs.train_data = torch.utils.data.DataLoader(dataset=train_dataset,batch_size=32,-   shuffle=True,+   shuffle=False,+   sampler=DistributedSampler(train_dataset),)Calling theset_epoch()method on theDistributedSamplerat the beginning of each epoch is necessary to make shuffling work
properly across multiple epochs. Otherwise, the same ordering will be used in each epoch.def _run_epoch(self, epoch):b_sz = len(next(iter(self.train_data))[0])+   self.train_data.sampler.set_epoch(epoch)for source, targets in self.train_data:...self._run_batch(source, targets)Saving model checkpoints¶We only need to save model checkpoints from one process. Without this
condition, each process would save its copy of the identical mode. Read
more on saving and loading models with
DDPhere- ckp = self.model.state_dict()+ ckp = self.model.module.state_dict()...
...- if epoch % self.save_every == 0:+ if self.gpu_id == 0 and epoch % self.save_every == 0:self._save_checkpoint(epoch)경고Collective callsare functions that run on all the distributed processes,
and they are used to gather certain states or values to a specific process. Collective calls require all ranks to run the collective code.
In this example,_save_checkpointshould not have any collective calls because it is only run on therank:0process.
If you need to make any collective calls, it should be before theifself.gpu_id==0check.Running the distributed training job¶Include new argumentsrank(replacingdevice) andworld_size.rankis auto-allocated by DDP when callingmp.spawn.world_sizeis the number of processes across the training job. For GPU training,
this corresponds to the number of GPUs in use, and each process works on a dedicated GPU.- def main(device, total_epochs, save_every):+ def main(rank, world_size, total_epochs, save_every):+  ddp_setup(rank, world_size)dataset, model, optimizer = load_train_objs()train_data = prepare_dataloader(dataset, batch_size=32)-  trainer = Trainer(model, train_data, optimizer, device, save_every)+  trainer = Trainer(model, train_data, optimizer, rank, save_every)trainer.train(total_epochs)+  destroy_process_group()if __name__ == "__main__":import systotal_epochs = int(sys.argv[1])save_every = int(sys.argv[2])-  device = 0      # shorthand for cuda:0-  main(device, total_epochs, save_every)+  world_size = torch.cuda.device_count()+  mp.spawn(main, args=(world_size, total_epochs, save_every,), nprocs=world_size)Further Reading¶Fault Tolerant distributed training(next tutorial in this series)Intro to DDP(previous tutorial in this series)Getting Started with DDPProcess Group
initialization