常見的數(shù)據(jù)傾斜是怎么造成的?
Shuffle的時候,將各個節(jié)點上相同的key拉取到某個節(jié)點的一個task進行處理,比如按照key進行聚合或join等操作,如果某個key對應的數(shù)據(jù)量特別大的話,就會發(fā)生數(shù)據(jù)傾斜現(xiàn)象。數(shù)據(jù)傾斜就成為了整個task運行時間的短板。
觸發(fā)shuffle的常見算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
要解決數(shù)據(jù)傾斜的問題,首先要定位數(shù)據(jù)傾斜發(fā)生在什么地方。
首先是哪個stage,直接在Web UI上看就可以,一般出現(xiàn)傾斜都是耗時特別長的Stage,然后查看運行耗時的task,一般是其中的某幾個Task一直拖著,其他的Task早已經(jīng)完成了,根據(jù)這個task,根據(jù)stage劃分原理,推算出數(shù)據(jù)傾斜發(fā)生在哪個shuffle類算子上。
如何查看發(fā)生傾斜的RDD呢?
如果是Spark RDD執(zhí)行shuffle算子導致的數(shù)據(jù)傾斜,那么可以在Spark作業(yè)中加入查看key分布的代碼,比如RDD.countByKey()。然后對統(tǒng)計出來各個key出現(xiàn)的次數(shù),collect、take到客戶端打印一下,就可以看到key的分布情況。
以下方法可以大概看出哪個key出現(xiàn)了傾斜:
JavaPairRDDhssData = getHssData(fs, sc, hssPath);
JavaPairRDDsample = hssData.sample(false, 0.1);
MapcountByKey = sample.countByKey();
出現(xiàn)傾斜的key有兩種情況:
1、某個可以出現(xiàn)傾斜
2、多個key出現(xiàn)傾斜
某個Key出現(xiàn)傾斜解決辦法:
通過上述方法可以知道是哪個Key出現(xiàn)了傾斜,所以可以先通過filter方法過濾掉傾斜的Key,把傾斜的Key和沒有傾斜的Key分開處理,由于Spark運行機制,所以單獨處理傾斜Key的時候就不會再出現(xiàn)傾斜現(xiàn)象。
上述方法只能處理特定的數(shù)據(jù)傾斜,對于實際的生產(chǎn)環(huán)境可能并不怎么適用,這事是解決傾斜的其中一個方法。
多個Key出現(xiàn)傾斜的解決辦法:
原理:在傾斜Shuffle之前給每一個Key都加上一個隨機前綴,然后再給加了前綴的Key進行一個Shuffle操作,在Shuffle操作后再把Key的前綴去掉。在這個過程中由于前綴的加入,會把傾斜的Key隨機的分配到不同的Task。然后去掉前綴從而解決數(shù)據(jù)傾斜的問題。
private static JavaPairRDDrepar(
JavaPairRDD。Cdr) {
JavaPairRDDmapToPair;
try {
mapToPair = 。Cdr
.mapToPair(new PairFunctiontuple2, String, agg() {
@Override
public Tuple2call(Tuple2t)
throws Exception {
//產(chǎn)生隨機前綴,隨機數(shù)大小看情況決定
long i = (long) (Math.random() * 150);
//添加隨機數(shù)前綴
return new Tuple2(i + _ + t._1, t._2);
}
}).sortByKey()//進行一個Shuffle操作打亂Key
//去掉隨機數(shù)前綴
.mapToPair(new PairFunctiontuple2, String, agg() {
@Override
public Tuple2call(Tuple2t)
throws Exception {
String str = t.1.split()[0];
return new Tuple2(str, t._2);
}
});
} catch (Exception e) {
return null;
}
return mapToPair;
}
以上是解決RDD數(shù)據(jù)傾斜簡單方法。
-
RDD
+關(guān)注
關(guān)注
0文章
7瀏覽量
7985 -
大數(shù)據(jù)
+關(guān)注
關(guān)注
64文章
8897瀏覽量
137539
發(fā)布評論請先 登錄
相關(guān)推薦
評論