참고Clickhereto download the full example code분산 데이터 병렬 처리와 병렬 처리 파이프라인을 사용한 트랜스포머 모델 학습¶Author:Pritam Damania번역:백선희이 튜토리얼은분산 데이터 병렬처리(Distributed Data Parallel)와병렬 처리 파이프라인를 사용하여 여러 GPU에 걸친 거대한 트랜스포머(Transformer) 모델을 어떻게 학습시키는지 보여줍니다.
이번 튜토리얼은nn.Transformer 와 TorchText 로 시퀀스-투-시퀀스(Sequence-to-Sequence) 모델링하기의
확장판이며 분산 데이터 병렬 처리와 병렬 처리 파이프라인이 어떻게 트랜스포머 모델 학습에 쓰이는지 보여주기 위해 이전 튜토리얼에서의
모델 규모를 증가시켰습니다.선수과목(Prerequisites):Pipeline Parallelismnn.Transformer 와 TorchText 로 시퀀스-투-시퀀스(Sequence-to-Sequence) 모델링하기분산 데이터 병렬 처리 시작하기모델 정의하기¶PositionalEncoding모듈은 시퀀스에서 토큰의 상대적, 절대적 위치에 대한
몇몇 정보를 주입합니다.
위치 인코딩은 임베딩과 같은 차원을 가지므로
둘을 합칠 수 있습니다. 여기서, 주파수가 다른sine과cosine함수를
사용합니다.importsysimportosimportmathimporttorchimporttorch.nnasnnimporttorch.nn.functionalasFimporttempfilefromtorch.nnimportTransformerEncoder,TransformerEncoderLayerclassPositionalEncoding(nn.Module):def__init__(self,d_model,dropout=0.1,max_len=5000):super(PositionalEncoding,self).__init__()self.dropout=nn.Dropout(p=dropout)pe=torch.zeros(max_len,d_model)position=torch.arange(0,max_len,dtype=torch.float).unsqueeze(1)div_term=torch.exp(torch.arange(0,d_model,2).float()*(-math.log(10000.0)/d_model))pe[:,0::2]=torch.sin(position*div_term)pe[:,1::2]=torch.cos(position*div_term)pe=pe.unsqueeze(0).transpose(0,1)self.pe=nn.Parameter(pe,requires_grad=False)defforward(self,x):x=x+self.pe[:x.size(0),:]returnself.dropout(x)이번 튜토리얼에서는, 트랜스포머 모델을 두 개의 GPU에 걸쳐서 나누고
병렬 처리 파이프라인으로 학습시켜 보겠습니다. 추가로,분산 데이터 병렬 처리를 사용하여 이 파이프라인의 두 복제본(replica)을 훈련시킵니다. 한 프로세스는
GPUs 0, 1에 거쳐 파이프를 구동하고 다른 프로세스는 GPUs 2, 3에서 파이프를 구동합니다. 그 다음, 이 두
프로세스는 분산 데이터 병렬처리로 두 복제본(replica)을 학습시킵니다.
모델은 바로nn.Transformer 와 TorchText 로 시퀀스-투-시퀀스(Sequence-to-Sequence) 모델링하기튜토리얼과
똑같은 모델이지만 두 단계로 나뉩니다. 대부분 파라미터(parameter)들은nn.TransformerEncoder계층(layer)에 포함됩니다.nn.TransformerEncoder는nn.TransformerEncoderLayer의nlayers로 구성되어 있습니다.
결과적으로, 이 튜토리얼에서는nn.TransformerEncoder에 중점을 두고 있으며nn.TransformerEncoderLayer의 절반은 한 GPU에 두고
나머지 절반은 다른 GPU에 있도록 모델을 분할합니다. 이를 위해서Encoder와Decoder섹션을 분리된 모듈로 빼낸 다음, 원본 트랜스포머 모듈을
나타내는nn.Sequential을 빌드 합니다.ifsys.platform=='win32':print('Windows platform is not supported for pipeline parallelism')sys.exit(0)iftorch.cuda.device_count()<4:print('Need at least four GPU devices for this tutorial')sys.exit(0)classEncoder(nn.Module):def__init__(self,ntoken,ninp,dropout=0.5):super(Encoder,self).__init__()self.pos_encoder=PositionalEncoding(ninp,dropout)self.encoder=nn.Embedding(ntoken,ninp)self.ninp=ninpself.init_weights()definit_weights(self):initrange=0.1self.encoder.weight.data.uniform_(-initrange,initrange)defforward(self,src):# Need (S, N) format for encoder.src=src.t()src=self.encoder(src)*math.sqrt(self.ninp)returnself.pos_encoder(src)classDecoder(nn.Module):def__init__(self,ntoken,ninp):super(Decoder,self).__init__()self.decoder=nn.Linear(ninp,ntoken)self.init_weights()definit_weights(self):initrange=0.1self.decoder.bias.data.zero_()self.decoder.weight.data.uniform_(-initrange,initrange)defforward(self,inp):# Need batch dimension first for output of pipeline.returnself.decoder(inp).permute(1,0,2)학습을 위한 다중 프로세스 시작¶각각 두 개의 GPU에서 자체 파이프라인을 구동하는 두 개의 프로세스를 시작합니다.run_worker는 각 프로세스에 실행됩니다.defrun_worker(rank,world_size):데이터 로드하고 배치 만들기¶학습 프로세스는torchtext의 Wikitext-2 데이터셋을 사용합니다.
torchtext 데이터셋에 접근하기 전에,https://github.com/pytorch/data을 참고하여 torchdata를
설치하시기 바랍니다.
단어 오브젝트는 훈련 데이터셋으로 만들어지고, 토큰을 텐서(tensor)로 수치화하는데 사용됩니다.
시퀀스 데이터로부터 시작하여,batchify()함수는 데이터셋을 열(column)들로 정리하고,batch_size사이즈의 배치들로 나눈 후에 남은 모든 토큰을 버립니다.
예를 들어, 알파벳을 시퀀스(총 길이 26)로 생각하고 배치 사이즈를 4라고 한다면,
알파벳을 길이가 6인 4개의 시퀀스로 나눌 수 있습니다:\[ \begin{bmatrix}
\text{A} & \text{B} & \text{C} & \ldots & \text{X} & \text{Y} & \text{Z}
\end{bmatrix}
\Rightarrow
\begin{bmatrix}
\begin{bmatrix}\text{A} \\ \text{B} \\ \text{C} \\ \text{D} \\ \text{E} \\ \text{F}\end{bmatrix} &
\begin{bmatrix}\text{G} \\ \text{H} \\ \text{I} \\ \text{J} \\ \text{K} \\ \text{L}\end{bmatrix} &
\begin{bmatrix}\text{M} \\ \text{N} \\ \text{O} \\ \text{P} \\ \text{Q} \\ \text{R}\end{bmatrix} &
\begin{bmatrix}\text{S} \\ \text{T} \\ \text{U} \\ \text{V} \\ \text{W} \\ \text{X}\end{bmatrix}
\end{bmatrix}\]이 열들은 모델에 의해서 독립적으로 취급되며, 이는G와F의 의존성이 학습될 수 없다는 것을 의미하지만, 더 효율적인
배치 프로세싱(batch processing)을 허용합니다.# In 'run_worker'defprint_with_rank(msg):print('[RANK{}]:{}'.format(rank,msg))fromtorchtext.datasetsimportWikiText2fromtorchtext.data.utilsimportget_tokenizerfromtorchtext.vocabimportbuild_vocab_from_iteratortrain_iter=WikiText2(split='train')tokenizer=get_tokenizer('basic_english')vocab=build_vocab_from_iterator(map(tokenizer,train_iter),specials=["<unk>"])vocab.set_default_index(vocab["<unk>"])defdata_process(raw_text_iter):data=[torch.tensor(vocab(tokenizer(item)),dtype=torch.long)foriteminraw_text_iter]returntorch.cat(tuple(filter(lambdat:t.numel()>0,data)))train_iter,val_iter,test_iter=WikiText2()train_data=data_process(train_iter)val_data=data_process(val_iter)test_data=data_process(test_iter)device=torch.device(2*rank)defbatchify(data,bsz,rank,world_size,is_train=False):# Divide the dataset into ``bsz`` parts.nbatch=data.size(0)//bsz# Trim off any extra elements that wouldn't cleanly fit (remainders).data=data.narrow(0,0,nbatch*bsz)# Evenly divide the data across the ``bsz`` batches.data=data.view(bsz,-1).t().contiguous()# Divide the data across the ranks only for training data.ifis_train:data_per_rank=data.size(0)//world_sizedata=data[rank*data_per_rank:(rank+1)*data_per_rank]returndata.to(device)batch_size=20eval_batch_size=10train_data=batchify(train_data,batch_size,rank,world_size,True)val_data=batchify(val_data,eval_batch_size,rank,world_size)test_data=batchify(test_data,eval_batch_size,rank,world_size)입력과 타겟 시퀀스를 생성하기 위한 함수들¶get_batch()함수는 트랜스포머 모델을 위한 입력과 타겟 시퀀스를
생성합니다. 이 함수는 소스 데이터를bptt길이를 가진 덩어리로 세분화합니다.
언어 모델링 과제를 위해서, 모델은
다음 단어인Target이 필요합니다. 예를 들어bptt의 값이 2라면,i= 0 일 때 다음의 2 개 변수(Variable)를 얻을 수 있습니다:청크가 차원 0에 속하며
트랜스포머 모델의S차원과 일치한다는 것을 유의해야 합니다.
배치 차원N은 1 차원에 해당합니다.# In 'run_worker'bptt=35defget_batch(source,i):seq_len=min(bptt,len(source)-1-i)data=source[i:i+seq_len]target=source[i+1:i+1+seq_len].view(-1)# Need batch dimension first for pipeline parallelism.returndata.t(),target모델 규모와 파이프 초기화¶병렬 처리 파이프라인을 활용한 대형 트랜스포머 모델 학습을 보이기 위해,
트랜스포머 계층 규모를 적절히 확장시킵니다.
4096차원의 임베딩 벡터, 4096의 은닉 사이즈, 16개의 어텐션 헤드(attention head)와 총 8 개의
트랜스포머 계층 (nn.TransformerEncoderLayer)를 사용합니다. 이는 최대~1 억개의 파라미터를 갖는 모델을 생성합니다.RPC 프레임워크를 초기화해야 합니다.
Pipe가  향후 호스트 파이프라인을 교차 확장할 수 있도록 하는RRef를 통해
RPC 프레임워크에 의존하기 때문입니다.
이때 RPC 프레임워크는 오직 하나의 하나의 worker로 초기화를 해야 하는데,
여러 GPU를 다루기 위해 프로세스 하나만 사용하고 있기 때문입니다.그런 다음 파이프라인은 한 GPU에 8개의 트랜스포머와
다른 GPU에 8개의 트랜스포머 계층으로 초기화됩니다. 한 파이프는 GPU 0, 1에 거쳐 설정되고
다른 하나는 GPU 2, 3에 설정됩니다. 그런 다음 DistributedDataParallel을 사용하여 두 파이프가 모두 복제됩니다.# In 'run_worker'ntokens=len(vocab)# the size of vocabularyemsize=4096# embedding dimensionnhid=4096# the dimension of the feedforward network model in ``nn.TransformerEncoder``nlayers=8# the number of ``nn.TransformerEncoderLayer`` in ``nn.TransformerEncoder``nhead=16# the number of heads in the Multihead Attention modelsdropout=0.2# the dropout valuefromtorch.distributedimportrpctmpfile=tempfile.NamedTemporaryFile()rpc.init_rpc(name="worker",rank=0,world_size=1,rpc_backend_options=rpc.TensorPipeRpcBackendOptions(init_method="file://{}".format(tmpfile.name),# Specifying _transports and _channels is a workaround and we no longer# will have to specify _transports and _channels for PyTorch# versions >= 1.8.1_transports=["ibv","uv"],_channels=["cuda_ipc","cuda_basic"],))# Num gpus for model parallelism.num_gpus=2partition_len=((nlayers-1)//num_gpus)+1# Add encoder in the beginning.tmp_list=[Encoder(ntokens,emsize,dropout).cuda(2*rank)]module_list=[]# Add all the necessary transformer blocks.foriinrange(nlayers):transformer_block=TransformerEncoderLayer(emsize,nhead,nhid,dropout)ifi!=0andi%(partition_len)==0:module_list.append(nn.Sequential(*tmp_list))tmp_list=[]device=i//(partition_len)tmp_list.append(transformer_block.to(2*rank+device))# Add decoder in the end.tmp_list.append(Decoder(ntokens,emsize).cuda(2*rank+num_gpus-1))module_list.append(nn.Sequential(*tmp_list))# Need to use 'checkpoint=never' since as of PyTorch 1.8, Pipe checkpointing# doesn't work with DDP.fromtorch.distributed.pipeline.syncimportPipechunks=8model=Pipe(torch.nn.Sequential(*module_list),chunks=chunks,checkpoint="never")# Initialize process group and wrap model in DDP.fromtorch.nn.parallelimportDistributedDataParallelimporttorch.distributedasdistos.environ['MASTER_ADDR']='localhost'os.environ['MASTER_PORT']='29500'dist.init_process_group(backend="nccl",rank=rank,world_size=world_size)model=DistributedDataParallel(model)defget_total_params(module:torch.nn.Module):total_params=0forparaminmodule.parameters():total_params+=param.numel()returntotal_paramsprint_with_rank('Total parameters in model:{:,}'.format(get_total_params(model)))모델 실행하기¶손실(loss)을 추적하기 위해CrossEntropyLoss가
적용되며, 옵티마이저(optimizer)로서SGD는 확률적 경사하강법(stochastic gradient descent method)을 구현합니다. 초기
학습률(learning rate)은 5.0로 설정됩니다.StepLR는
에폭(epoch)에 따라서 학습률을 조절하는 데 사용됩니다. 학습하는 동안,
기울기 폭발(gradient exploding)을 방지하기 위해 모든 기울기를 함께 조정(scale)하는 함수nn.utils.clip_grad_norm_을 이용합니다.# In 'run_worker'criterion=nn.CrossEntropyLoss()lr=5.0# learning rateoptimizer=torch.optim.SGD(model.parameters(),lr=lr)scheduler=torch.optim.lr_scheduler.StepLR(optimizer,1.0,gamma=0.95)importtimedeftrain():model.train()# Turn on the train modetotal_loss=0.start_time=time.time()ntokens=len(vocab)# Train only for 50 batches to keep script execution time low.nbatches=min(50*bptt,train_data.size(0)-1)forbatch,iinenumerate(range(0,nbatches,bptt)):data,targets=get_batch(train_data,i)optimizer.zero_grad()# Since the Pipe is only within a single host and process the ``RRef``# returned by forward method is local to this node and can simply# retrieved via ``RRef.local_value()``.output=model(data).local_value()# Need to move targets to the device where the output of the# pipeline resides.loss=criterion(output.view(-1,ntokens),targets.cuda(2*rank+1))loss.backward()torch.nn.utils.clip_grad_norm_(model.parameters(),0.5)optimizer.step()total_loss+=loss.item()log_interval=10ifbatch%log_interval==0andbatch>0:cur_loss=total_loss/log_intervalelapsed=time.time()-start_timeprint_with_rank('| epoch{:3d}|{:5d}/{:5d}batches | ''lr{:02.2f}| ms/batch{:5.2f}| ''loss{:5.2f}| ppl{:8.2f}'.format(epoch,batch,nbatches//bptt,scheduler.get_last_lr()[0],elapsed*1000/log_interval,cur_loss,math.exp(cur_loss)))total_loss=0start_time=time.time()defevaluate(eval_model,data_source):eval_model.eval()# Turn on the evaluation modetotal_loss=0.ntokens=len(vocab)# Evaluate only for 50 batches to keep script execution time low.nbatches=min(50*bptt,data_source.size(0)-1)withtorch.no_grad():foriinrange(0,nbatches,bptt):data,targets=get_batch(data_source,i)output=eval_model(data).local_value()output_flat=output.view(-1,ntokens)# Need to move targets to the device where the output of the# pipeline resides.total_loss+=len(data)*criterion(output_flat,targets.cuda(2*rank+1)).item()returntotal_loss/(len(data_source)-1)에폭을 반복합니다. 만약 검증 오차(validation loss)가 지금까지 관찰한 것 중 최적이라면
모델을 저장합니다. 각 에폭 이후에 학습률을 조절합니다.# In 'run_worker'best_val_loss=float("inf")epochs=3# The number of epochsbest_model=Noneforepochinrange(1,epochs+1):epoch_start_time=time.time()train()val_loss=evaluate(model,val_data)print_with_rank('-'*89)print_with_rank('| end of epoch{:3d}| time:{:5.2f}s | valid loss{:5.2f}| ''valid ppl{:8.2f}'.format(epoch,(time.time()-epoch_start_time),val_loss,math.exp(val_loss)))print_with_rank('-'*89)ifval_loss<best_val_loss:best_val_loss=val_lossbest_model=modelscheduler.step()평가 데이터셋으로 모델 평가하기¶평가 데이터셋에서의 결과를 확인하기 위해 최고의 모델을 적용합니다.# In 'run_worker'test_loss=evaluate(best_model,test_data)print_with_rank('='*89)print_with_rank('| End of training | test loss{:5.2f}| test ppl{:8.2f}'.format(test_loss,math.exp(test_loss)))print_with_rank('='*89)# Main executionimporttorch.multiprocessingasmpif__name__=="__main__":world_size=2mp.spawn(run_worker,args=(world_size,),nprocs=world_size,join=True)Output¶[RANK0]:|epoch1|10/50batches|lr5.00|ms/batch778.97|loss43.31|ppl6432469059895903232.00[RANK1]:|epoch1|10/50batches|lr5.00|ms/batch778.90|loss44.50|ppl21245447128217366528.00[RANK0]:|epoch1|20/50batches|lr5.00|ms/batch699.89|loss44.50|ppl21176949187407757312.00[RANK1]:|epoch1|20/50batches|lr5.00|ms/batch699.87|loss44.62|ppl23975861229620961280.00[RANK0]:|epoch1|30/50batches|lr5.00|ms/batch698.86|loss41.62|ppl1193312915629888256.00[RANK1]:|epoch1|30/50batches|lr5.00|ms/batch698.87|loss40.69|ppl471605759847546240.00[RANK0]:|epoch1|40/50batches|lr5.00|ms/batch698.34|loss45.20|ppl42812308420836458496.00[RANK1]:|epoch1|40/50batches|lr5.00|ms/batch698.33|loss45.68|ppl68839569686012223488.00[RANK1]:-----------------------------------------------------------------------------------------[RANK1]:|endofepoch1|time:40.08s|validloss0.80|validppl2.22[RANK1]:-----------------------------------------------------------------------------------------[RANK0]:-----------------------------------------------------------------------------------------[RANK0]:|endofepoch1|time:40.09s|validloss0.80|validppl2.22[RANK0]:-----------------------------------------------------------------------------------------[RANK0]:|epoch2|10/50batches|lr4.75|ms/batch768.51|loss36.34|ppl6063529544668166.00[RANK1]:|epoch2|10/50batches|lr4.75|ms/batch769.23|loss37.41|ppl17651211266236086.00[RANK0]:|epoch2|20/50batches|lr4.75|ms/batch699.57|loss28.97|ppl3798441739584.11[RANK1]:|epoch2|20/50batches|lr4.75|ms/batch699.56|loss29.28|ppl5203636967575.47[RANK0]:|epoch2|30/50batches|lr4.75|ms/batch699.04|loss28.43|ppl2212498693571.25[RANK1]:|epoch2|30/50batches|lr4.75|ms/batch699.05|loss28.33|ppl2015144761281.48[RANK0]:|epoch2|40/50batches|lr4.75|ms/batch699.10|loss23.30|ppl13121380184.92[RANK1]:|epoch2|40/50batches|lr4.75|ms/batch699.09|loss23.41|ppl14653799192.87[RANK0]:-----------------------------------------------------------------------------------------[RANK0]:|endofepoch2|time:39.97s|validloss0.24|validppl1.27[RANK0]:-----------------------------------------------------------------------------------------[RANK1]:-----------------------------------------------------------------------------------------[RANK1]:|endofepoch2|time:39.98s|validloss0.24|validppl1.27[RANK1]:-----------------------------------------------------------------------------------------[RANK0]:|epoch3|10/50batches|lr4.51|ms/batch769.36|loss12.80|ppl361681.11[RANK1]:|epoch3|10/50batches|lr4.51|ms/batch768.97|loss12.57|ppl287876.61[RANK0]:|epoch3|20/50batches|lr4.51|ms/batch698.27|loss12.01|ppl164364.60[RANK1]:|epoch3|20/50batches|lr4.51|ms/batch698.30|loss11.98|ppl159095.89[RANK0]:|epoch3|30/50batches|lr4.51|ms/batch697.75|loss10.90|ppl54261.91[RANK1]:|epoch3|30/50batches|lr4.51|ms/batch697.72|loss10.89|ppl53372.39[RANK0]:|epoch3|40/50batches|lr4.51|ms/batch699.49|loss10.78|ppl47948.35[RANK1]:|epoch3|40/50batches|lr4.51|ms/batch699.50|loss10.79|ppl48664.42[RANK0]:-----------------------------------------------------------------------------------------[RANK0]:|endofepoch3|time:39.96s|validloss0.38|validppl1.46[RANK0]:-----------------------------------------------------------------------------------------[RANK1]:-----------------------------------------------------------------------------------------[RANK1]:|endofepoch3|time:39.96s|validloss0.38|validppl1.46[RANK1]:-----------------------------------------------------------------------------------------[RANK0]:=========================================================================================[RANK0]:|Endoftraining|testloss0.33|testppl1.39[RANK0]:=========================================================================================[RANK1]:=========================================================================================[RANK1]:|Endoftraining|testloss0.33|testppl1.39[RANK1]:=========================================================================================Total running time of the script:( 0 minutes  0.000 seconds)DownloadPythonsourcecode:ddp_pipeline.pyDownloadJupyternotebook:ddp_pipeline.ipynbGallery generated by Sphinx-Gallery