源碼解讀系列將和大家一起來讀Megatron的pretrain部分代碼。
在源碼解讀第一篇中,我們講解了如何做「分布式環境初始化」,即按照DP/TP/PP對進程進行分組,并為每個進程指定GPU。在這一章中,我們將一起讀「模型并行部分」:如何切分模型,并搬入分布式環境定義好的DP/TP/PP組中。
「本文將提供:」
- 詳細的圖解。畫圖說明代碼的設計架構,講清代碼想做一件什么事。
- 詳細的代碼注釋。在圖解的基礎上,提取核心代碼部分,并附上注釋。
「如何利用本文提高源碼閱讀效率:」
- 先看一~三部分。了解模型并行的設計思想、整體框架及入口函數。
- 打開Megatron源碼,找到入口函數,開始閱讀。
- 閱讀中的每一塊細節,可參考四~八部分。
「閱讀本文前置知識:」
- 圖解大模型訓練之:張量模型并行,Megatron-LM
- 圖解大模型訓練之:Megatron源碼解讀1,分布式環境初始化
「本文目錄:」
一、模型概述
二、模型切割在做一件什么事
- 2.1 模型切割設計思想
- 2.2 隨機種子
三、模型并行框架
- 3.1 模型并行入口函數
- 3.2 定義并搬運模型
- 3.3 分布式模型:CodeGeeX
四、MegatronModule
五、Emebdding
六、VocabParallelEmebdding
七、ParallelSelfAttention:分布式block的一般套路
- 7.1 列切割:ColumnParallelLinear
- 7.2 行切割:RowParallelLinear
- 7.3 ParallelSelfAttention
八、CrossEntropy
- 8.1 計算logit
- 8.2 計算交叉熵
九、筋疲力盡的總結
十、參考(本文相關源碼與論文)
一、模型概述
前文說過,用Megatron做分布式訓練的開源大模型有很多,我們選用的是THUDM開源的CodeGeeX(代碼生成式大模型,類比于openAI Codex)。選用它的原因是“完全開源”與“清晰的模型架構和預訓練配置圖”,能幫助我們高效閱讀源碼。我們再來回顧下這兩張圖。
「模型架構」
「預訓練配置」
由圖可知,CodeGeeX在預訓練中采用的是8頭TP(同一個node內的8張卡做TP,8張卡組成一個完整的模型),192頭DP(192個node間做DP),一共1536塊GPU進行訓練。
「【閱讀提醒】:如果你對GPT模型比較熟悉,則不需要花時間細看CodeGeeX架構圖也能無障礙閱讀本文。架構圖只是在涉及模型細節時,可以對照著看?!?/strong>
二、模型切割在做一件什么事
2.1 模型切割設計思想
回顧一下,在初始化分布式環境中,我們根據DP/TP/PP組設置并劃分了進程,確定了模型的切割方法,如下圖:(注意:這并不是CodeGeeX的劃分框架,而是一個更廣義的例子,細節可閱讀上篇講解)
接下來,我們就可以根據這個框架來切割模型了。pytorch默認將模型(nn.Module)定義在CPU上,因此,我們在CPU上定義并初始化模型,然后將其搬運到當前進程所對應的GPU上,整個過程如下圖:首先,我們是面向進程編程的,也就是整份腳本處理的是發生在1個進程上的事情。這樣做的好處是,我們只需要維護1份腳本,然后將其發去不同機器的各張卡上執行,就能實現全局的并行。
但是,1個進程處理的是模型的不同部分,比如GPT模型,它的pre層涉及到Embedding計算,post層涉及到softmax和loss的計算,這樣每個進程上處理的模型是不一樣的,這時怎么辦呢?別忘了,我們能夠取到進程id(全局或DP/TP/PP組內的),這樣我們就能通過進程id,寫if...else...
來解決模型差異化問題了。
明確了這個思想,現在我們可以開始寫代碼了,我們有兩種方式對模型進行切割:
- 「方案一:」先定義出完整的模型,并對模型參數做初始化,然后根據進程id取出相應子模型,搬運到GPU上
- 「方案二:」直接根據進程id,設計好當前子模型,做參數初始化,搬運到GPU上
這兩者的核心差別,在于“隨機種子”
的設定。
2.2 隨機種子
在分布式訓練中,「隨機種子是非常重要的,它關系到模型是否能夠復現」。例如我們采取activation checkpoint的技術來節省顯存時,在backward過程中我們需要重算forward得到activation,這時候就需要我們完整復現之前forward的過程,各類參數的初始化結果也要和之前完全一致。
我們來看幾個例子:
例1: Word Embedding
WE1和WE2間需要采用不同的隨機種子。因為若采用相同的隨機種子,則WE1和WE2的結果完全一樣,這不等價于先隨機初始化WE,再將它進行切割。
例2: dropout
左側方框中的2個dropout,在初始化時需要用不同的隨機種子。因為這樣才等價于對完整的dropout做初始化,然后再切割。右側方框中的dropout,需要用相同的隨機種子(雖然右邊只畫了1個dropout,但其實是2個dropout,每塊GPU上各一個,因為此時兩塊GPU上的輸出已經AllReduce,是完全一致的。做完AllReduce后,兩塊GPU繼續獨立計算,因此實際上有兩個dropout)。
關于隨機種子設定的一般結論
從例子中,我們可以得出一個結論:「一般在TP/PP組內,設定不同的隨機種子。而在DP組內,設定相同的隨機種子?!?/strong> 這只是一個一般結論,我們可以根據實際情況去調整。
最后,回到模型切割上,方案1(先做整體初始化再切割)在代碼里被稱為“CPU上的初始化”(_initialize_affine_weight_cpu
),方案2(直接對局部初始化)被稱為“在GPU上的初始化”(_initialize_affine_weight_gpu
)。我們會在切割部分的代碼里經??匆娝鼈?。
三、模型并行框架
現在,我們可以來看具體的代碼了
3.1 模型并行入口函數
模型并行部分的代碼入口依然在megatron/training.py
的pretrain
函數下,代碼如下:
defpretrain(
train_valid_test_dataset_provider,
model_provider,
forward_step_func,
valid_forward_step_func=None,
extra_args_provider=None,
args_defaults={},
):
#1.初始化分布式環境(源碼解讀1內容)
initialize_megatron(
extra_args_provider=extra_args_provider,args_defaults=args_defaults
)
...
# 2、模型并行:定義模型架構,并切割模型(本文重點)
model,optimizer,lr_scheduler=setup_model_and_optimizer(model_provider)
...
#3、構造train/val/test數據集(下一篇將講述)
...(
train_data_iterator,
valid_data_iterator,
test_data_iterator,
)=build_train_valid_test_data_iterators(train_valid_test_dataset_provider)
...
#4、訓練(下下一篇將講述)
iteration=train(
forward_step_func,
valid_forward_step_func,
model,
optimizer,
lr_scheduler,
train_data_iterator,
valid_data_iterator,
)
...
由代碼可知,setup_model_and_optimizer
是整個模型并行的入口函數,如下圖,它主要由”「定義模型架構并切割模型」“,“「設置optimizer」”和“「設置學習率」”三部分組成。我們關注的重點在第一部分上(get_model
)。
3.2 定義并搬運模型
get_model
的內容可簡化成下圖:
get_model
函數主要做了兩件事:
-
在CPU上定義模型。pytorch默認在CPU上定義模型(nn.Module)。
model_provider
是一個函數,調用它即可返回CPU版的模型,也就是一個CodeGeeX類,這個將是下文要介紹的重點。 -
把模型從CPU搬運至GPU上。這里有兩種方法可供選擇:
- 「顯式搬運?!?/strong>即手動將模型搬運到當前進程所對應的GPU上
- 「權重精度設定。」由ZeRO的思想可知,在模型訓練中,把權重精度從fp32降至fp16,是一種節省顯存的好辦法。如果確定使用這種優化辦法,將模型搬運到GPU上后,我們需要修改精度。
-
「初始化DP組」。這里指的是
定義DP組間forward、backward和梯度計算與通訊等方法
。在Megatron中,TP和PP組的這些方法是人為定義的
(在定義CPU模型時已設置好,我們將在下文講CodeGeeX細節時看到),而DP組則是可以用現成的
(torch的DistributedDataParallel)。在具體使用時,我們可以:(1)直接調用DistributedDataParallel?;颍?)在DistributedDataParallel這個類的基礎上做一些改進,例如增加對碎片化內存的管理,對計算梯度時的精度控制等。
- 「方案一:借助deepspeed進行管理」。在源碼解讀1中我們提過,秉持著萬物皆可wrap的原則,按照deepspeed官網教程,只需要在Megatron的某些文件中插入相應代碼,就可以讓deepspeed來管理模型的分布式、DP組間的顯存優化等,這里同理。
- 「方案二:手動搬運管理?!?/strong>這里需要我們以下事情:
get_model函數的核心代碼如下(一切盡在注釋中):
defget_model(model_provider_func):
"""Buildthemodel."""
args=get_args()
#1、定義并構建CPU版模型
if(#1.1、當分布式進行框架采用virtualpipeline(是NVDIA后續提出的對Megatron的優化方法,可先忽略不看)
mpu.get_pipeline_model_parallel_world_size()>1
andargs.virtual_pipeline_model_parallel_sizeisnotNone
):
model=[]
foriinrange(args.virtual_pipeline_model_parallel_size):
mpu.set_virtual_pipeline_model_parallel_rank(i)
#Setpre_processandpost_processonlyaftervirtualrankisset.
pre_process=mpu.is_pipeline_first_stage()
post_process=mpu.is_pipeline_last_stage()
this_model=model_provider_func(
pre_process=pre_process,post_process=post_process
)
model.append(this_model)
else:#1.2其余情況
#判斷當前進程是否是PP組的第一個進程(例如第一部分圖例中PP組的g0)
pre_process=mpu.is_pipeline_first_stage()
#判斷當前進程是否是PP組的最后一個進程(例如第一部分圖例中PP組的g12)
post_process=mpu.is_pipeline_last_stage()
#構建CPU版CodeGeeX模型
model=model_provider_func(pre_process=pre_process,post_process=post_process)
...
#2、將模型從CPU搬運到GPU上
#2.1如果采用Megatron-DeepSpeed的方式,則直接返回模型,后面的搬運,數據并行等工作將由deepspeed來完成
# ref:https://www.deepspeed.ai/tutorials/megatron/
ifargs.deepspeed:
returnmodel
#將當前進程所維護的模型,從CPU搬運到GPU上(GPU即為在初始化時為當前進程分配的那塊GPU)
print(f">movingmodeltoGPU...",flush=True)
formodel_moduleinmodel:
model_module.cuda(torch.cuda.current_device())
print(f">movingtoGPUdone",flush=True)
#fp16轉換(pytorch默認模型參數精度為fp32,依需決定計算過程中是否要轉成fp16,節省顯存)
ifargs.fp16orargs.bf16:
print(f">convertingmodeltofp16...",flush=True)
model=[Float16Module(model_module,args)formodel_moduleinmodel]
print(f">convertingtofp16done",flush=True)
#采用pytorch定義的DistributedDataParallel管理數據并行
ifargs.DDP_impl=="torch":
i=torch.cuda.current_device()
model=[
torchDDP(
model_module,
device_ids=[i],
output_device=i,
process_group=mpu.get_data_parallel_group(),#數據并行的組
)
formodel_moduleinmodel
]
returnmodel
#采用自定義的DistributedDataParallel管理數據并行
#即在pytorch的DistributedDataParallel的基礎上,自己再定義內存管理、梯度精度等計算方式,更有效利用顯存
ifargs.DDP_impl=="local":#自定義的數據并行類在megatron/model/distributed.py下
print(f">creatingDDPmodel...",flush=True)
model=[
LocalDDP(
model_module,
args.accumulate_allreduce_grads_in_fp32,
args.use_contiguous_buffers_in_ddp,
)
formodel_moduleinmodel
]
print(f">creatingDDPmodeldone",flush=True)
returnmodel
raiseNotImplementedError(
"UnknownDDPimplementationspecified:{}.""Exiting.".format(args.DDP_impl)
)
特別說明的是,前文提過模型的首尾兩層和中間層的架構可能不一樣,因此我們通過pre_process 和post_process來做區分。(當然你也能選擇用進程序id,只是首尾兩層經常被Q到,所以這里單獨明確了下)。對CodeGeeX來說,由它預訓練配置可知,它的PP并行度為1,也就是1塊GPU上涵蓋了模型的第一層至最后一層,所以pre_process和post_process實際上沒有用到。感興趣的朋友可以閱讀NVIDIA Megatron源碼下關于bert、gpt2的預訓練代碼,具體了解pre_process和post_process在定義模型時起的作用。
3.3 分布式模型:CodeGeeX
現在,我們來看最核心的分布式模型:CodeGeeX類。
前文說過,1個腳本處理的是1個進程上發生的事情,而1個進程對應的是模型的一部分。單進程的架構如下:
圖中每個方框都表示源碼里定義的一個nn.Module 類(除了最上的方框外)具體定義為:
-
CodeGeeX
: 定義一塊GPU上的模型。它由TransformerLanguageModel 和_VocabParallelCrossEntropy這兩個核心類組成。 -
TransformerLanguageModel
:定義每塊GPU上輸入層embedding和中間block層的結構 -
Embedding
: 定義每塊GPU上輸入層embedding結構及相關計算,輸出結果已AllReduce(TP組間) -
ParallelTransformer
:定義每塊GPU上所有中間blocks的結構及相關計算,輸出結果已AllReduce(TP組間) -
ParallelTransformerLayer
: 定義每塊GPU上單個block的結構及相關計算,輸出結果已AllReduce(TP組間) -
ParallelSelfAttention
: 定義每塊GPU上單個block中,attention的結構及相關計算,輸出結果已AllReduce(TP組間) -
ParallelMLP
: 定義每塊GPU上單個block中,mlp層的結構及相關計算,輸出結果已AllReduce(TP組間)。 -
_VocabParallelCrossEntropy
: torch.autograd.Function,定義每塊GPU上,輸出層embedding、softmax和loss等結構及相關計算。
「為什么需要對輸出做AllReduce?」回顧Megtron理論部分的講解,在縱向切割模型時,Megatron是在輸入X完整的情況下,設計模型切割的方式的。因此,對于模型的每一層輸出,我們都要在TP組間做AllReduce,來保證下一層拿到的輸入也是完整的。類名字中的"Parallel",也是指在TP組中做并行
,如下圖所示:
到這一步,我們終于把模型切割部分的整體流程講完了。「雖然我們是以CodeGeeX為例,但這個流程圖可以看作是通用的?!?/strong>不同模型間只有模型具體結構、DP/TP/PP組設置這些方面的差別,整個并行框架是通用的。下面,我們來探究圖中所繪的各個類的細節。
四、MegatronModule
上面所繪制的幾類,并不是直接繼承自nn.Module ,而是皆繼承于自定義的class MegatronModule(torch.nn.Module)
。我們說過,gpt類模型,輸入和輸出層共用一個word embedding。因此,這個類的主要作用,就是令PP組的第一個進程和最后一個進程滿足這個條件(不過我不懂為什么要把這個限制放在一個大母類中去做,設計上感覺有點奇怪)。MegatronModule類的整體架構如下:
特別說明,「initialize_word_embedding 并不是某一具體的初始化WE方法,它只是起到如圖所說的強制作用?!?/strong>
MegatronModule的代碼如下(一切盡在注釋中):
classMegatronModule(torch.nn.Module):
"""MegatronspecificextensionsoftorchModulewithsupport
forpipelining."""
def__init__(self,share_word_embeddings=True):
super(MegatronModule,self).__init__()
#input和output是否要共享一套WE
self.share_word_embeddings=share_word_embeddings
defstate_dict_for_save_checkpoint(
self,destination=None,prefix="",keep_vars=False
):
"""Usethisfunctiontooverridethestatedictfor
savingcheckpoints."""
#模型訓練中,及時將參數保存到指定位置(設置checkpoint),
#這樣在訓練出問題時,可以從checkpoint點重新load參數,繼續訓練
returnself.state_dict(destination,prefix,keep_vars)
defword_embeddings_weight(self):
"""獲取word_embedding"""
ifmpu.is_pipeline_first_stage(ignore_virtual=True):
returnself.language_model.embedding.word_embeddings.weight
ifmpu.is_pipeline_last_stage(ignore_virtual=True):
ifnotself.share_word_embeddings:
raiseException(#強制要求共享一套embedding
"word_embeddings_weight()calledforlast"
"stage,butshare_word_embeddingsisfalse"
)
returnself.word_embeddings.weight#參見initialize_word_embeddings中WE的定義
raiseException(#如果當前進程是PP組的中間進程,則其上未維護WE,因此當然獲取不到
"word_embeddings_weight()shouldbe""calledforfirstandlaststageonly"
)
definitialize_word_embeddings(self,init_method_normal):
"""強制PP組最后一個進程初始化WE時,直接使用PP組第一個進程的WE"""
args=get_args()
ifnotself.share_word_embeddings:#強制shareembeddingg
raiseException(
"initialize_word_embeddings()wascalledbut"
"share_word_embeddingsisfalse"
)
#PP組并行度為1時,第一層和最后一層都在一塊GPU上,天然共享WE,無需做強制
ifargs.pipeline_model_parallel_size==1:
return
#---------------------------------------------------
#如果流水線并行的度不為1時,依次做三件事:
#【初始化時】:
#1、在PP組最后一個進程上初始化一個WE,令其取值全為0
#2、在PP組第一個進程與最后一個進程間做一次AllReduce,保證兩者的WE完全一致
#【訓練時】:
#3、每次想在PP組第一個/最后一個進程上使用WE時,要做一次通信,保證兩者用的WE完全一致
ifmpu.is_pipeline_last_stage():#若當前進程是PP組最后一個進程
assertnotmpu.is_pipeline_first_stage()
self._word_embeddings_for_head_key="word_embeddings_for_head"
#初始化一個WE(已按vocab_size維度切割,可參見Megatron原理篇對WE的講解)
#VocabParallelEmbedding將在下文詳細講解
self.word_embeddings=mpu.VocabParallelEmbedding(
args.padded_vocab_size,#vocab_size
args.hidden_size,#embed_dim
init_method=init_method_normal(args.init_method_std),#初始化方法(在model/utils.py下)
)
#用0填充WE(等待下面做AllReduce后取得第一個進程上的WE)
self.word_embeddings.weight.data.fill_(0)
self.word_embeddings.weight.shared=True
iftorch.distributed.is_initialized():
ifmpu.is_pipeline_first_stage()ormpu.is_pipeline_last_stage():#若當前進程是PP組第一個或最后一個進程
#在兩進程間做AllReduce,保證它們使用的WE完全一致
# mpu.get_embedding_group:在源碼解讀1中講過,是除DP/TP/PP之外設置的又一進程組,
#主要就是用來做關于WE的通訊
torch.distributed.all_reduce(
self.word_embeddings_weight().data,group=mpu.get_embedding_group()
)
else:
print(
"WARNING!Distributedprocessesaren'tinitialized,so"
"wordembeddingsinthelastlayerarenotinitialized."
"Ifyouarejustmanipulatingamodelthisisfine,but"
"thisneedstobehandledmanually.Ifyouaretraining"
"somethingisdefinitelywrong."
)
五、Embedding
Emebdding類定義了word/position/segment embedding
,并定義輸入X過embedding層的計算方法。關鍵屬性和方法如下圖:
-
self.word_embeddings
:來自自定義的VocabParallelEmbedding (下面會詳述) 。「含“Parallel”則意味著參數在TP組間做了切割」。因此self.word_embeddings 是切割好的WE。每個進程上維護根據自己進程序號所取下的那塊WE(例如下圖中的WE1,WE2,圖片來自Megatron原理篇):
-
self.position_embeddings
和self.tokentype_embeddings
這兩者都和輸入X相關,而輸入X是不做切割的,因此這兩者也無需切割。 -
state_dict_for_save_checkpoint
和load_state_dict
。在源碼注解里,這兩個函數分別給出了"easy load" 和"customize load"的注釋,這個注釋不是很貼切。實際上,前者用于在模型訓練過程中及時讀取當前參數,及時保存(做checkpoint);后者則一般用于模型的重載,例如訓到一半掛掉了,我們就重新初始化一個新模型,重載上個checkpoint保存下的權重。
Embedding層代碼如下(一切盡在注釋中):
classEmbedding(MegatronModule):
"""Languagemodelembeddings.
Arguments:
hidden_size:hiddensize
vocab_size:vocabularysize
max_sequence_length:maximumsizeofsequence.This
isusedforpositionalembedding
embedding_dropout_prob:dropoutprobabilityforembeddings
init_method:weightinitializationmethod
num_tokentypes:sizeofthetoken-typeembeddings.0value
willignorethisembedding
"""
def__init__(
self,
hidden_size,#每個token的向量維度
vocab_size,#詞表大小
max_sequence_length,#最長序列長度
embedding_dropout_prob,#dropoutprobabilityforembeddings
init_method,#初始化權重的方法
num_tokentypes=0,#類似于Bert中的segmenttype
):
super(Embedding,self).__init__()
args=get_args()
self.hidden_size=hidden_size
self.init_method=init_method
self.num_tokentypes=num_tokentypes
self.max_sequence_length=max_sequence_length
#WEsize:(vocab_size//TP_N,hidden_size)
#TP_N表示TP組模型并行度
self.word_embeddings=mpu.VocabParallelEmbedding(
vocab_size,self.hidden_size,init_method=self.init_method)
self._word_embeddings_key='word_embeddings'
self.vocab_size=vocab_size
#PEsize:(max_seq_len,hidden_size)
self.position_embeddings=torch.nn.Embedding(
max_sequence_length,self.hidden_size)
self.position_embeddings=self.position_embeddings.half()
self._position_embeddings_key='position_embeddings'
#Initializethepositionembeddings.
self.init_method(self.position_embeddings.weight)
#TE_size:(num_tokentypes,hidden_size)
#TE類似于Bert中的segmentembedding
self._tokentype_embeddings_key='tokentype_embeddings'
ifself.num_tokentypes>0:
self.tokentype_embeddings=torch.nn.Embedding(self.num_tokentypes,
self.hidden_size)
#Initializethetoken-typeembeddings.
self.init_method(self.tokentype_embeddings.weight)
else:
self.tokentype_embeddings=None
#Embeddingsdropout
self.embedding_dropout=torch.nn.Dropout(embedding_dropout_prob)
defadd_tokentype_embeddings(self,num_tokentypes):
"""如果在pretrain階段未定義TE,而在fine-tune階段TE,則可通過此函數添加
"""
ifself.tokentype_embeddingsisnotNone:
raiseException('tokentypeembeddingsisalreadyinitialized')
iftorch.distributed.get_rank()==0:
print('addingembeddingfor{}tokentypes'.format(num_tokentypes),
flush=True)
self.num_tokentypes=num_tokentypes
self.tokentype_embeddings=torch.nn.Embedding(num_tokentypes,
self.hidden_size)
#Initializethetoken-typeembeddings.
self.init_method(self.tokentype_embeddings.weight)
defforward(self,input_ids,position_ids,tokentype_ids=None):
"""定義輸入X過embedding層的計算方法
"""
#words_embeddingssize=(b,seq_len,hidden_size)
#再次注意:self.word_embeddings做forward時,最終的輸出結果時AllReduce的(見上圖)
words_embeddings=self.word_embeddings(input_ids)
#position_embeddingssize=(b,seq_len,hidden_size)
position_embeddings=self.position_embeddings(position_ids)
#embedding=WE+PE
#embeddingsize=(b,seq_len,hidden_size)
embeddings=words_embeddings+position_embeddings
#依需要決定是否增加TE
iftokentype_idsisnotNone:
assertself.tokentype_embeddingsisnotNone
embeddings=embeddings+self.tokentype_embeddings(tokentype_ids)
else:
assertself.tokentype_embeddingsisNone
#Dropout.
embeddings=self.embedding_dropout(embeddings)
returnembeddings
defstate_dict_for_save_checkpoint(
self,destination=None,prefix='',keep_vars=False,
):
"""Foreasyload.
在模型訓練過程中及時讀取當前參數,方便及時保存(做checkpoint)
篇幅限制,這里不展示細節
"""
...
defload_state_dict(self,state_dict,strict=True):
"""Customizedload.
用于模型的重載。例如訓到一半掛掉了,我們就重新初始化一個新模型,
重載上個checkpoint保存下的權重。
篇幅限制,這里不展示細節
"""
...
六、VocabParallelEmbedding
該類用于定義分布式的word embedding,整體架構如下,同樣只列舉了核心屬性和方法:
具體代碼如下,可以特別關注「初始化和forward」部分,同時建議大家閱讀理論篇中關于這一過程的詳細講解(一切盡在注釋中)
classVocabParallelEmbedding(torch.nn.Module):
"""Embeddingparallelizedinthevocabularydimension.
Thisismainlyadaptedfromtorch.nn.Embeddingandallthedefault
valuesarekept.
Arguments:
num_embeddings:vocabularysize.
embedding_dim:sizeofhiddenstate.
init_method:methodtoinitializeweights.
"""
def__init__(self,num_embeddings,embedding_dim,init_method=init.xavier_normal_):
super(VocabParallelEmbedding,self).__init__()
#Keeptheinputdimensions.
self.num_embeddings=num_embeddings#vocab_size
self.embedding_dim=embedding_dim#hidden_state.
#Setthedetaulsforcompatibility.
self.padding_idx=None
self.max_norm=None
self.norm_type=2.0
self.scale_grad_by_freq=False
self.sparse=False
self._weight=None
#當前進程所在TP組進程總數
self.tensor_model_parallel_size=get_tensor_model_parallel_world_size()
#根據當前進程在TP組中的序號,確定其所需維護的WE部分,沿著vocab維度對WE進行切割
#例如,進程id=0, 維護詞表序號[0,5)范圍內的數據;進程id=1,維護[5,10)
(
self.vocab_start_index,
self.vocab_end_index,
)=VocabUtility.vocab_range_from_global_vocab_size(
self.num_embeddings,
get_tensor_model_parallel_rank(),
self.tensor_model_parallel_size,
)
#計算當前進程維護的詞表大小
self.num_embeddings_per_partition=(
self.vocab_end_index-self.vocab_start_index
)
#對WE做初始化
args=get_args()#讀取預訓練參數配置
ifargs.use_cpu_initialization:#CPU上做初始化
self.weight=Parameter(#在CPU上先生成一個完整的WE
torch.empty(
self.num_embeddings_per_partition,
self.embedding_dim,
dtype=args.params_dtype,
#dtype=torch.float32,
)
)
#對CPU上的WE做切割(隨機種子在初始化分布式中已設定好,不用變)
_initialize_affine_weight_cpu(
self.weight,
self.num_embeddings,
self.embedding_dim,
self.num_embeddings_per_partition,
0,
init_method,#初始化權重的方法,例如xavier之類
)
else:#在GPU上做初始化
self.weight=Parameter(#生成一個切割好的WE
torch.empty(
self.num_embeddings_per_partition,
self.embedding_dim,
device=torch.cuda.current_device(),
dtype=args.params_dtype,
#dtype=torch.float32,
)
)
#在GPU上做初始化,注意TP組內不同進程采用不同的隨機種子
_initialize_affine_weight_gpu(
self.weight,init_method,partition_dim=0,stride=1
)
defforward(self,input_):
"""定義輸入X過WE的計算方法,輸出結果已經過AllReduce"""
ifself.tensor_model_parallel_size>1:#如果使用TP
#如果在當前進程維護的WE上,找不到對應的單詞,那么對應位置就賦0
#例如當前的數據的tokenid是:[2,7,1,5],當前維護的詞表是[0,1,2](start_index=0, end_index = 3),
#則mask之后的數據為[2,0,1,0]
#Buildthemask.
input_mask=(input_=self.vocab_end_index
)
#Masktheinput.
masked_input=input_.clone()-self.vocab_start_index
masked_input[input_mask]=0
else:
masked_input=input_
#輸入X,過當前進程維護的部分WE的結果
output_parallel=F.embedding(
masked_input,#tensorcontainingindicesintotheembeddingmatrix
self.weight,#切割好的wordembedding的權重
self.padding_idx,
self.max_norm,
self.norm_type,
self.scale_grad_by_freq,
self.sparse,
)
#當前詞表不維護的部分,都設為0
ifself.tensor_model_parallel_size>1:
output_parallel[input_mask,:]=0.0#
#將TP組各GPU上的結果做AllReduce
output=reduce_from_tensor_model_parallel_region(output_parallel)
returnoutput
def_initialize_affine_weight_cpu(...):
"""CPU版權重初始化。這個不難,大家可以自己閱讀"""
...
def_initialize_affine_weight_gpu(...):
"""GPU版權重初始化。特別關注設置隨機種子部分"""
...
#借助deepspeed或自定義的get_cuda_rng_tracker方法,對隨機種子進行操作
#get_cuda_rng_tracker細節,大家可自行閱讀源碼
ifds_checkpointing.is_configured():
globalget_cuda_rng_tracker
get_cuda_rng_tracker=ds_checkpointing.get_cuda_rng_tracker
withget_cuda_rng_tracker().fork():
init_method(weight)
七、ParallelSelfAttention:分布式block的一般套路
【閱讀提示】:閱讀本節時可:
- 對照第一部分CodeGeeX框架圖
- 對照Megatron理論篇對矩陣切分的講解
首先來看切割Attention的示意圖,由圖可知,「對QKV矩陣,采用“列切割”,對線性矩陣B,采用“行切割”」。這樣設計的好處是,在經過QKV的計算后,各進程在不用通訊的前提下,繼續做線性計算,直到最后一步才AllReduce,起到降低通訊成本的作用:
我們先單獨來看“列切割”與“行切割”的實現代碼。Megatron將它們定義成了兩個nn.Module類。
7.1 列切割:ColumnParallelLinear
列切割示意圖如下:
-
f和g
是兩個共軛算子,可理解為兩個torch.autograd.Function類。在這個類下,我們可以「根據需要重寫forward和backward方法」。 -
f
: 「forward中,直接copy輸入;backward中,對梯度做AllReduce」。在代碼里定義為class _CopyToModelParallelRegion(torch.autograd.Function) -
g
: 「forward中,all-gather輸出;backward中,對梯度做split」(每張卡經過all-gather已有完整的Y了,因此以Y為起點計算梯度后,沿著列做split就可得到Y1和Y2的梯度)。在代碼里定義為class _GatherFromModelParallelRegion(torch.autograd.Function)
classColumnParallelLinear(torch.nn.Module):
"""Linearlayerwithcolumnparallelism.
ThelinearlayerisdefinedasY=XA+b.Aisparallelizedalong
itsseconddimensionasA=[A_1,...,A_p].
Arguments:
input_size:firstdimensionofmatrixA.
output_size:seconddimensionofmatrixA.
bias:Iftrue,addbias
gather_output:Iftrue,callall-getheronoutputandmakeYavaiable
toallGPUs,otherwise,everyGPUwillhaveitsoutput
whichisY_i=XA_i
init_method:methodtoinitializeweights.Notethatbiasisalwaysset
tozero.
stride:Forthestridedlinearlayers.
keep_master_weight_for_test:Thiswasaddedfortestingandshouldbe
settoFalse.Itreturnsthemasterweights
usedforinitialization.
skip_bias_add:Thiswasaddedtoenableperformanceoptimationswherebias
canbefusedwithotherelementwiseoperations.weskip
addingbiasbutinsteadreturnit.
"""
#該類定義了切割后的權重W,例如對上圖來說,W1和W2都可分別視為該類的一個實例
def__init__(
self,
input_size,#W的第一個維度
output_size,#W的第二個維度
bias=True,#是否需要引入bias
gather_output=True,#決定是否要將Y1和Y2做all-gather
init_method=init.xavier_normal_,
stride=1,
keep_master_weight_for_test=False,
skip_bias_add=False,
params_dtype=None,
skip_init=False,
device=None,
):
super(ColumnParallelLinear,self).__init__()
#Keepinputparameters
self.input_size=input_size
self.output_size=output_size
self.gather_output=gather_output
#Dividetheweightmatrixalongthelastdimension.
#當前進程所在TP組的總進程數
world_size=get_tensor_model_parallel_world_size()
#每塊GPU上維護的hidden_size的大小,等于原hidden_zize//TP組總進程數
self.output_size_per_partition=divide(output_size,world_size)
self.skip_bias_add=skip_bias_add
self.params_dtype=params_dtype
self.device=device
#Parameters.
#Note:torch.nn.functional.linearperformsXA^T+bandasaresult
#Initializeweight.
args=get_args()#取得命令行所有的參數
ifnotskip_init:
ifargs.use_cpu_initialization:#CPU上初始化
self.weight=Parameter(
torch.empty(
self.output_size_per_partition,
self.input_size,
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
self.master_weight=_initialize_affine_weight_cpu(#
self.weight,
self.output_size,
self.input_size,
self.output_size_per_partition,
0,
init_method,
stride=stride,
return_master_weight=keep_master_weight_for_test,
)
else:#GPU上初始化
self.weight=Parameter(
torch.empty(
self.output_size_per_partition,
self.input_size,
device=self.deviceifself.deviceisnotNoneelsetorch.cuda.current_device(),
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
_initialize_affine_weight_gpu(
self.weight,init_method,partition_dim=0,stride=stride
)
else:
self.register_parameter("weight",None)
#對bias做處理,道理同weight
ifbiasandnotskip_init:
ifargs.use_cpu_initialization:#CPU上初始化
self.bias=Parameter(
torch.empty(self.output_size_per_partition,
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype)
)
else:
self.bias=Parameter(#GPU上初始化
torch.empty(
self.output_size_per_partition,
device=self.deviceifself.deviceisnotNoneelsetorch.cuda.current_device(),
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
set_tensor_model_parallel_attributes(self.bias,True,0,stride)
#Alwaysinitializebiastozero.
withtorch.no_grad():
self.bias.zero_()
else:
self.register_parameter("bias",None)
defforward(self,input_):
#定義列切割中的f算子
#調用copy_to_tensor_model_parallel_region則新建一個_CopyToModelParallelRegion實例(見下)
input_parallel=copy_to_tensor_model_parallel_region(input_)
bias=self.biasifnotself.skip_bias_addelseNone#定義bias
output_parallel=F.linear(input_parallel,self.weight,bias)#X*切割好的權重
#決定是否要對每個進程上的輸出結果做All-Reduce
ifself.gather_output:
#定義列切割中的g算子
#調用gather_from_tensor_model_parallel_region則新建一個_GatherFromModelParallelRegion實例(見下)
output=gather_from_tensor_model_parallel_region(output_parallel)#把各GPU上的輸出按照列gather起來后,作為最終輸出
else:
output=output_parallel#否則最終輸出還是自己算的那塊GPU
output_bias=self.biasifself.skip_bias_addelseNone
returnoutput,output_bias
#列切割中的f與g
class_CopyToModelParallelRegion(torch.autograd.Function):
"""Passtheinputtothemodelparallelregion."""
#列切割下的f算子
# forward:copy輸入
# backward:對梯度做AllReduce
@staticmethod
defsymbolic(graph,input_):
returninput_
@staticmethod
defforward(ctx,input_):
returninput_
@staticmethod
defbackward(ctx,grad_output):
return_reduce(grad_output)
class_GatherFromModelParallelRegion(torch.autograd.Function):
"""Gathertheinputfrommodelparallelregionandconcatinate."""
#列切割中的g算子
# forward:All-Gather輸出
# backward:對梯度,沿著列方向做split
@staticmethod
defsymbolic(graph,input_):
return_gather(input_)
@staticmethod
defforward(ctx,input_):
return_gather(input_)
@staticmethod
defbackward(ctx,grad_output):
return_split(grad_output)
7.2 行切割:RowParallelLinear
-
f
: forward中,按列split輸入;backward中,all-gather梯度 -
g
: forward中,AllReduce輸出;backward中,直接輸出梯度,無需做任何通訊(因為經過g的foward,每塊GPU上已擁有了Yi和Y,則根據圖中g的backward公式可知,每塊GPU可獨立計算梯度)
代碼如下:
classRowParallelLinear(torch.nn.Module):
"""Linearlayerwithrowparallelism.
ThelinearlayerisdefinedasY=XA+b.Aisparallelizedalong
itsfirstdimensionandXalongitsseconddimensionas:
--
|A_1|
|.|
A=|.|X=[X_1,...,X_p]
|.|
|A_p|
--
Arguments:
input_size:firstdimensionofmatrixA.
output_size:seconddimensionofmatrixA.
bias:Iftrue,addbias.Notethatbiasisnotparallelized.
input_is_parallel:Iftrue,weassumethattheinputisalready
splitacrosstheGPUsandwedonotsplit
again.
init_method:methodtoinitializeweights.Notethatbiasisalwaysset
tozero.
stride:Forthestridedlinearlayers.
keep_master_weight_for_test:Thiswasaddedfortestingandshouldbe
settoFalse.Itreturnsthemasterweights
usedforinitialization.
skip_bias_add:Thiswasaddedtoenableperformanceoptimationswherebias
canbefusedwithotherelementwiseoperations.weskip
addingbiasbutinsteadreturnit.
"""
def__init__(
self,
input_size,
output_size,
bias=True,
input_is_parallel=False,
init_method=init.xavier_normal_,
stride=1,
keep_master_weight_for_test=False,
skip_bias_add=False,
params_dtype=None,
skip_init=False,
device=None,
):
super(RowParallelLinear,self).__init__()
#Keepinputparameters
self.input_size=input_size
self.output_size=output_size
self.input_is_parallel=input_is_parallel
#Dividetheweightmatrixalongthelastdimension.
world_size=get_tensor_model_parallel_world_size()
self.input_size_per_partition=divide(input_size,world_size)
self.skip_bias_add=skip_bias_add
self.params_dtype=params_dtype
self.device=device
#Parameters.
#Note:torch.nn.functional.linearperformsXA^T+bandasaresult
#weallocatethetranspose.
#Initializeweight.
args=get_args()
ifnotskip_init:
ifargs.use_cpu_initialization:
self.weight=Parameter(
torch.empty(
self.output_size,
self.input_size_per_partition,
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
self.master_weight=_initialize_affine_weight_cpu(
self.weight,
self.output_size,
self.input_size,
self.input_size_per_partition,
1,
init_method,
stride=stride,
return_master_weight=keep_master_weight_for_test,
)
else:
self.weight=Parameter(
torch.empty(
self.output_size,
self.input_size_per_partition,
device=self.deviceifself.deviceisnotNoneelsetorch.cuda.current_device(),
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
_initialize_affine_weight_gpu(
self.weight,init_method,partition_dim=1,stride=stride
)
else:
self.register_parameter("weight",None)
ifbiasandnotskip_init:
ifargs.use_cpu_initialization:
self.bias=Parameter(
torch.empty(self.output_size,
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype)
)
else:
self.bias=Parameter(
torch.empty(
self.output_size,
device=self.deviceifself.deviceisnotNoneelsetorch.cuda.current_device(),
dtype=self.params_dtypeifself.params_dtypeisnotNoneelseargs.params_dtype,
)
)
#Alwaysinitializebiastozero.
withtorch.no_grad():
self.bias.zero_()
else:
self.register_parameter("bias",None)
defforward(self,input_):
#Setupbackpropall-reduce.
ifself.input_is_parallel:
input_parallel=input_
else:
input_parallel=scatter_to_tensor_model_parallel_region(input_)
#Matrixmultiply.
output_parallel=F.linear(input_parallel,self.weight)
#All-reduceacrossallthepartitions.
output_=reduce_from_tensor_model_parallel_region(output_parallel)
ifnotself.skip_bias_add:
output=output_+self.biasifself.biasisnotNoneelseoutput_
output_bias=None
else:
output=output_
output_bias=self.bias
returnoutput,output_bias
#行切割中的f和g算子
class_ScatterToModelParallelRegion(torch.autograd.Function):
"""Splittheinputandkeeponlythecorrespondingchucktotherank."""
#行切割中的f算子
# forward:沿列split輸入
# backward:all-gather梯度
@staticmethod
defsymbolic(graph,input_):
return_split(input_)
@staticmethod
defforward(ctx,input_):
return_split(input_)
@staticmethod
defbackward(ctx,grad_output):
return_gather(grad_output)
class_ReduceFromModelParallelRegion(torch.autograd.Function):
"""All-reducetheinputfromthemodelparallelregion."""
#行切割中的g算子
# forward:AllReduce輸出
# backward:正常計算梯度,GPU間無需做任何通訊
@staticmethod
defsymbolic(graph,input_):
return_reduce(input_)
@staticmethod
defforward(ctx,input_):
return_reduce(input_)
@staticmethod
defbackward(ctx,grad_output):
returngrad_output
7.3 ParallelSelfAttention
該類的構造如下圖:
這張圖中透露的核心含義是,「每個進程上維護的都是按列切割完的QKV矩陣」,進程間獨立計算,QKV矩陣的輸出結果一般不做AllReduce。同時,「每個進程上維護的是按行切割完的dense(線型層)矩陣」,Attention輸出過線性層后的結果,做AllReduce。另外,在設置attention_dropout時,同樣調用了get_cuda_rng_tracker 方法,令TP組內的進程擁有不同的隨機種子。「最后,你可能想問,dense后的dropout去哪里了」?代碼里把它定義到了ParallelTransformerLayer 下(等于attention + mlp)。
相信有了上面的說明,看這塊代碼就不難了。篇幅限制,這里不展示代碼了。大家可以對照著CodeGeeX架構圖,來看這里multi-head attention的計算方式。
ParallelMLP,ParallelTransformerLayer和ParallelTransformer都采用的是一樣的套路,也略過不言。
八、CrossEntropy
現在,終于可以來看模型的最后一層:交叉熵的平行計算。核心類為_VocabParallelCrossEntropy
我們在原理篇中講過交叉熵的并行計算,其優化核心是將通訊量從b*s*v降至b*s。但是Megatron代碼中定義的交叉熵計算方式,稍微復雜一些,也和我們一般理解的交叉熵有些許差異。所以我們先用圖解,來看下代碼定義的交叉熵計算流程:
【注】:
- 對X和Y_i來說,(b, s, h)維度下應該畫成一個立方體,為了表達簡練,這里將b拍平了。
- 對其余維度中含b的矩陣,b正常表示,即row=b
8.1 計算logit
首先,在使用_VocabParallelCrossEntropy 計算交叉熵前,我們需要計算logit。這時我們調用parallel_lm_logits 函數,將模型最后一層的輸出X(復習一下,這個X已經在TP組內AllReduce了),乘上當前進程上維護的輸入層WE的轉置(復習一下,輸入層和輸出層共用一套embedding),得到當前進程的logit Y_i,「同時我們選擇不對輸出logit做AllReduce」。
你可能會有一個疑惑:「在Transformer中,輸出層會額外訓練一個線性矩陣,來計算logit;為什么在gpt中,可以用輸入層WE的轉置來代替這個線性矩陣?」
這個問題的答案,對理解Megatron交叉熵計算也至關重要。我們可「將X*WE^T結果理解成“X與WE間的相似度”」,例如對Y1來說,它的第一行中的每個logit,表示第一個token與詞表里每個詞的相似度。
注意到每個進程上只維護部分WE。例如,假設詞表共有5個單詞,WE1維護前5個單詞,WE2維護后5個單詞。因此再嚴格來說:「對Y1,它的第一行中的每個logit,表示第一個token與詞表中前5個詞的相似度;對Y2,它的第一行中的每個logit,表示第一個token與詞表中后5個詞的相似度。我們要記住這個含義。」
8.2 計算交叉熵
知道了logit的含義,我們來看交叉熵計算。
首先做了一系列求max的計算,得到基于全局的max(logit),再將orig_logit - max(logit),得到處理后的結果。這步理解起來不難,主要目的是為了防止計算溢出。
「接下來,就是基于logit算loss了?!?/strong>
-
每個進程上都有一份(b, s)維度的真值,它表示每個token的真值是哪個詞(詞用id表示)。我們基于這份真值,在Y_i上找出真值位置的logit。例如:seq_length = 3,即我們需要對3個token去做預測,假設前兩個token的真值在第1個進程所維護的WE1中,最后一個token的真值在第2個進程所維護的WE2中。那么我們去Y1的前兩行里,取出真值位置的logit,這個logit表示“token與真值的相似度”,去Y2的最后一行里做同樣操作。
-
這樣,我們就能得到L1和L2,和真值位置不對應的地方,統一填充0。隨后對L1和L2做AllReduce,得到L。「L中的每行表示“token與真值間的相似度"」
-
現在,我們回來對Y1和Y2的每一行求sum(e^logit),得到e1和e2。將e1和e2做AllReduce,得到e。「e中的每行表示“token和詞表中所有詞相似度的總和”」
-
我們希望「(token和詞表中所有詞相似度的總和-token與真值間的相似度) /token和詞表中所有詞相似度的總和」這個值最小,這個差值就是最終的loss。
8.3 代碼
理清了這點,現在可以來看代碼了(一切盡在注釋中),建議對這塊還有疑問的朋友,可以寫個test腳本把中間結果打印出來,方便理解:
class_VocabParallelCrossEntropy(torch.autograd.Function):
"""
分布式計算Loss
"""
@staticmethod
defforward(ctx,vocab_parallel_logits,target):
#1.logit-globalmax(logit)操作,主要目的是防溢出
logits_max=torch.max(vocab_parallel_logits,dim=-1)[0]#(b,s,1)
torch.distributed.all_reduce(#(b,s,1)
logits_max,
op=torch.distributed.ReduceOp.MAX,#找全局最大值
group=get_tensor_model_parallel_group(),
)
#Subtractthemaximumvalue.
vocab_parallel_logits.sub_(logits_max.unsqueeze(dim=-1))#原始GPU上維護的logits減去每行最大值(防止溢出)
#2、根據當前進程id,取出當前進程所維護詞表序號等信息
#函數,能夠獲取當前進程所維護詞表的start_index和end_index
get_vocab_range=VocabUtility.vocab_range_from_per_partition_vocab_size
#這塊GPU上logits最后一維的大小,等于所維護的詞表的大?。╲/N)
partition_vocab_size=vocab_parallel_logits.size()[-1]
#取得當前進程所在TP組中的序號
rank=get_tensor_model_parallel_rank()
#取得當前進程所在TP組的總進程數
world_size=get_tensor_model_parallel_world_size()
#取得當前進程所維護的詞表的start_index和end_index
vocab_start_index,vocab_end_index=get_vocab_range(
partition_vocab_size,rank,world_size
)
#3.基于真值,取出每個token在真值位置上的logit(即和真值的相似度)
#Createamaskofvalidvocabids(1meansitneedstobemasked)
target_mask=(target=vocab_end_index)#target=(b,s)
masked_target=target.clone()-vocab_start_index
masked_target[target_mask]=0
#Getpredicted-logits=logits[target].
#ForSimplicity,weconvertlogitstoa2-Dtensorwithsize
#[*,partition-vocab-size]andtargettoa1-Dtensorofsize[*].
logits_2d=vocab_parallel_logits.view(-1,partition_vocab_size)#(b*s,v/N)
masked_target_1d=masked_target.view(-1)#(b*s)
arange_1d=torch.arange(#[b*s]
start=0,end=logits_2d.size()[0],device=logits_2d.device
)
#logits_2d[arange_1d,masked_target_1d]:
# tensor的切片操作。arange_1d:取出所有的行。masked_target_1d:取出logit
predicted_logits_1d=logits_2d[arange_1d,masked_target_1d]#(b*s)
predicted_logits_1d=predicted_logits_1d.clone().contiguous()
predicted_logits=predicted_logits_1d.view_as(target)#(b,s)
predicted_logits[target_mask]=0.0
#AllreduceisneededtogetthechunksfromotherGPUs.
torch.distributed.all_reduce(#allreduce之后得到的logit矩陣為(b,s),每一個位置表示對應真值位置的預測logit
predicted_logits,
op=torch.distributed.ReduceOp.SUM,
group=get_tensor_model_parallel_group(),
)
#SumofexponentialoflogitsalongvocabdimensionacrossallGPUs.
exp_logits=vocab_parallel_logits#(b,s,v/N)
torch.exp(vocab_parallel_logits,out=exp_logits)
sum_exp_logits=exp_logits.sum(dim=-1)#(b,s)
torch.distributed.all_reduce(
sum_exp_logits,
op=torch.distributed.ReduceOp.SUM,
group=get_tensor_model_parallel_group(),
)
#4.計算Loss=log(sum(exp(logits)))-predicted-logit.
loss=torch.log(sum_exp_logits)-predicted_logits#(b,s)
#Storesoftmax,target-maskandmasked-targetforbackwardpass.
exp_logits.div_(sum_exp_logits.unsqueeze(dim=-1))
ctx.save_for_backward(exp_logits,target_mask,masked_target_1d)
returnloss
@staticmethod
defbackward(ctx,grad_output):
#Retreivetensorsfromtheforwardpath.
softmax,target_mask,masked_target_1d=ctx.saved_tensors
#Alltheinputshavesoftmaxastheirgradient.
grad_input=softmax
#Forsimplicity,workwiththe2Dgradient.
partition_vocab_size=softmax.size()[-1]
grad_2d=grad_input.view(-1,partition_vocab_size)
#Addthegradientfrommatchingclasses.
arange_1d=torch.arange(start=0,end=grad_2d.size()[0],device=grad_2d.device)
grad_2d[arange_1d,masked_target_1d]-=1.0-target_mask.view(-1).float()
#Finallyelementwisemultiplicationwiththeoutputgradients.
grad_input.mul_(grad_output.unsqueeze(dim=-1))
returngrad_input,None
九、總結
啊這總結怎么寫呢,嘔心瀝血終于寫完了。希望能給到大家幫助!
-
gpu
+關注
關注
28文章
4729瀏覽量
128890 -
源碼
+關注
關注
8文章
639瀏覽量
29185 -
大模型
+關注
關注
2文章
2423瀏覽量
2643
原文標題:圖解大模型訓練之:Megatron源碼解讀2,模型并行
文章出處:【微信號:GiantPandaCV,微信公眾號:GiantPandaCV】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論