數(shù)據(jù)傾斜問題剖析
數(shù)據(jù)傾斜是分布式系統(tǒng)不可避免的問題,任何分布式系統(tǒng)都有幾率發(fā)生數(shù)據(jù)傾斜,但有些小伙伴在平時工作中感知不是很明顯。這里要注意本篇文章的標(biāo)題—“千億級數(shù)據(jù)”,為什么說千億級,因為如果一個任務(wù)的數(shù)據(jù)量只有幾百萬,它即使發(fā)生了數(shù)據(jù)傾斜,所有數(shù)據(jù)都跑到一臺機(jī)器去執(zhí)行,對于幾百萬的數(shù)據(jù)量,一臺機(jī)器執(zhí)行起來還是毫無壓力的,這時數(shù)據(jù)傾斜對我們感知不大,只有數(shù)據(jù)達(dá)到一個量級時,一臺機(jī)器應(yīng)付不了這么多數(shù)據(jù),這時如果發(fā)生數(shù)據(jù)傾斜,最后就很難算出結(jié)果。
所以就需要我們對數(shù)據(jù)傾斜的問題進(jìn)行優(yōu)化,盡量避免或減輕數(shù)據(jù)傾斜帶來的影響。
在解決數(shù)據(jù)傾斜問題之前,還要再提一句:沒有瓶頸時談?wù)搩?yōu)化,都是自尋煩惱。
大家想想,在map和reduce兩個階段中,最容易出現(xiàn)數(shù)據(jù)傾斜的就是reduce階段,因為map到reduce會經(jīng)過shuffle階段,在shuffle中默認(rèn)會按照key進(jìn)行hash,如果相同的key過多,那么hash的結(jié)果就是大量相同的key進(jìn)入到同一個reduce中,導(dǎo)致數(shù)據(jù)傾斜。
那么有沒有可能在map階段就發(fā)生數(shù)據(jù)傾斜呢,是有這種可能的。
一個任務(wù)中,數(shù)據(jù)文件在進(jìn)入map階段之前會進(jìn)行切分,默認(rèn)是128M一個數(shù)據(jù)塊,但是如果當(dāng)對文件使用GZIP壓縮等不支持文件分割操作的壓縮方式時,MR任務(wù)讀取壓縮后的文件時,是對它切分不了的,該壓縮文件只會被一個任務(wù)所讀取,如果有一個超大的不可切分的壓縮文件被一個map讀取時,就會發(fā)生map階段的數(shù)據(jù)傾斜。
所以,從本質(zhì)上來說,發(fā)生數(shù)據(jù)傾斜的原因有兩種:一是任務(wù)中需要處理大量相同的key的數(shù)據(jù)。二是任務(wù)讀取不可分割的大文件。
數(shù)據(jù)傾斜解決方案
MapReduce和Spark中的數(shù)據(jù)傾斜解決方案原理都是類似的,以下討論Hive使用MapReduce引擎引發(fā)的數(shù)據(jù)傾斜,Spark數(shù)據(jù)傾斜也可以此為參照。
1. 空值引發(fā)的數(shù)據(jù)傾斜
實(shí)際業(yè)務(wù)中有些大量的null值或者一些無意義的數(shù)據(jù)參與到計算作業(yè)中,表中有大量的null值,如果表之間進(jìn)行join操作,就會有shuffle產(chǎn)生,這樣所有的null值都會被分配到一個reduce中,必然產(chǎn)生數(shù)據(jù)傾斜。
之前有小伙伴問,如果A、B兩表join操作,假如A表中需要join的字段為null,但是B表中需要join的字段不為null,這兩個字段根本就join不上啊,為什么還會放到一個reduce中呢?
這里我們需要明確一個概念,數(shù)據(jù)放到同一個reduce中的原因不是因為字段能不能join上,而是因為shuffle階段的hash操作,只要key的hash結(jié)果是一樣的,它們就會被拉到同一個reduce中。
解決方案:
第一種:可以直接不讓null值參與join操作,即不讓null值有shuffle階段
SELECT * FROM log a JOIN users b ON a.user_id IS NOT NULL AND a.user_id = b.user_id UNION ALL SELECT * FROM log a WHERE a.user_id IS NULL;
第二種:因為null值參與shuffle時的hash結(jié)果是一樣的,那么我們可以給null值隨機(jī)賦值,這樣它們的hash結(jié)果就不一樣,就會進(jìn)到不同的reduce中:
SELECT * FROM log a LEFT JOIN users b ON CASE WHEN a.user_id IS NULL THEN concat(‘hive_’, rand()) ELSE a.user_id END = b.user_id;
2. 不同數(shù)據(jù)類型引發(fā)的數(shù)據(jù)傾斜
對于兩個表join,表a中需要join的字段key為int,表b中key字段既有string類型也有int類型。當(dāng)按照key進(jìn)行兩個表的join操作時,默認(rèn)的Hash操作會按int型的id來進(jìn)行分配,這樣所有的string類型都被分配成同一個id,結(jié)果就是所有的string類型的字段進(jìn)入到一個reduce中,引發(fā)數(shù)據(jù)傾斜。
解決方案:
如果key字段既有string類型也有int類型,默認(rèn)的hash就都會按int類型來分配,那我們直接把int類型都轉(zhuǎn)為string就好了,這樣key字段都為string,hash時就按照string類型分配了:
SELECT * FROM users a LEFT JOIN logs b ON a.usr_id = CAST(b.user_id AS string);
3. 不可拆分大文件引發(fā)的數(shù)據(jù)傾斜
當(dāng)集群的數(shù)據(jù)量增長到一定規(guī)模,有些數(shù)據(jù)需要?dú)w檔或者轉(zhuǎn)儲,這時候往往會對數(shù)據(jù)進(jìn)行壓縮;當(dāng)對文件使用GZIP壓縮等不支持文件分割操作的壓縮方式,在日后有作業(yè)涉及讀取壓縮后的文件時,該壓縮文件只會被一個任務(wù)所讀取。如果該壓縮文件很大,則處理該文件的Map需要花費(fèi)的時間會遠(yuǎn)多于讀取普通文件的Map時間,該Map任務(wù)會成為作業(yè)運(yùn)行的瓶頸。這種情況也就是Map讀取文件的數(shù)據(jù)傾斜。
解決方案:
這種數(shù)據(jù)傾斜問題沒有什么好的解決方案,只能將使用GZIP壓縮等不支持文件分割的文件轉(zhuǎn)為bzip和zip等支持文件分割的壓縮方式。
所以,我們在對文件進(jìn)行壓縮時,為避免因不可拆分大文件而引發(fā)數(shù)據(jù)讀取的傾斜,在數(shù)據(jù)壓縮的時候可以采用bzip2和Zip等支持文件分割的壓縮算法。
4. 數(shù)據(jù)膨脹引發(fā)的數(shù)據(jù)傾斜
在多維聚合計算時,如果進(jìn)行分組聚合的字段過多,如下:
select a,b,c,count(1)from log group by a,b,c with rollup;
注:對于最后的with rollup關(guān)鍵字不知道大家用過沒,with rollup是用來在分組統(tǒng)計數(shù)據(jù)的基礎(chǔ)上再進(jìn)行統(tǒng)計匯總,即用來得到group by的匯總信息。
如果上面的log表的數(shù)據(jù)量很大,并且Map端的聚合不能很好地起到數(shù)據(jù)壓縮的情況下,會導(dǎo)致Map端產(chǎn)出的數(shù)據(jù)急速膨脹,這種情況容易導(dǎo)致作業(yè)內(nèi)存溢出的異常。如果log表含有數(shù)據(jù)傾斜key,會加劇Shuffle過程的數(shù)據(jù)傾斜。
解決方案:
可以拆分上面的sql,將with rollup拆分成如下幾個sql:
SELECT a, b, c, COUNT(1) FROM log GROUP BY a, b, c; SELECT a, b, NULL, COUNT(1) FROM log GROUP BY a, b; SELECT a, NULL, NULL, COUNT(1) FROM log GROUP BY a; SELECT NULL, NULL, NULL, COUNT(1) FROM log;
但是,上面這種方式不太好,因為現(xiàn)在是對3個字段進(jìn)行分組聚合,那如果是5個或者10個字段呢,那么需要拆解的SQL語句會更多。
在Hive中可以通過參數(shù) hive.new.job.grouping.set.cardinality 配置的方式自動控制作業(yè)的拆解,該參數(shù)默認(rèn)值是30。表示針對grouping sets/rollups/cubes這類多維聚合的操作,如果最后拆解的鍵組合大于該值,會啟用新的任務(wù)去處理大于該值之外的組合。如果在處理數(shù)據(jù)時,某個分組聚合的列有較大的傾斜,可以適當(dāng)調(diào)小該值。
5. 表連接時引發(fā)的數(shù)據(jù)傾斜
兩表進(jìn)行普通的repartition join時,如果表連接的鍵存在傾斜,那么在 Shuffle 階段必然會引起數(shù)據(jù)傾斜。
解決方案:
通常做法是將傾斜的數(shù)據(jù)存到分布式緩存中,分發(fā)到各個Map任務(wù)所在節(jié)點(diǎn)。在Map階段完成join操作,即MapJoin,這避免了 Shuffle,從而避免了數(shù)據(jù)傾斜。
MapJoin是Hive的一種優(yōu)化操作,其適用于小表JOIN大表的場景,由于表的JOIN操作是在Map端且在內(nèi)存進(jìn)行的,所以其并不需要啟動Reduce任務(wù)也就不需要經(jīng)過shuffle階段,從而能在一定程度上節(jié)省資源提高JOIN效率。
在Hive 0.11版本之前,如果想在Map階段完成join操作,必須使用MAPJOIN來標(biāo)記顯示地啟動該優(yōu)化操作,由于其需要將小表加載進(jìn)內(nèi)存所以要注意小表的大小。
如將a表放到Map端內(nèi)存中執(zhí)行,在Hive 0.11版本之前需要這樣寫:
select /* +mapjoin(a) */ a.id , a.name, b.age from a join b on a.id = b.id;
如果想將多個表放到Map端內(nèi)存中,只需在mapjoin()中寫多個表名稱即可,用逗號分隔,如將a表和c表放到Map端內(nèi)存中,則 /* +mapjoin(a,c) */ 。
在Hive 0.11版本及之后,Hive默認(rèn)啟動該優(yōu)化,也就是不在需要顯示的使用MAPJOIN標(biāo)記,其會在必要的時候觸發(fā)該優(yōu)化操作將普通JOIN轉(zhuǎn)換成MapJoin,可以通過以下兩個屬性來設(shè)置該優(yōu)化的觸發(fā)時機(jī):
hive.auto.convert.join=true 默認(rèn)值為true,自動開啟MAPJOIN優(yōu)化。
hive.mapjoin.smalltable.filesize=2500000 默認(rèn)值為2500000(25M),通過配置該屬性來確定使用該優(yōu)化的表的大小,如果表的大小小于此值就會被加載進(jìn)內(nèi)存中。
注意:使用默認(rèn)啟動該優(yōu)化的方式如果出現(xiàn)莫名其妙的BUG(比如MAPJOIN并不起作用),就將以下兩個屬性置為fase手動使用MAPJOIN標(biāo)記來啟動該優(yōu)化:
hive.auto.convert.join=false (關(guān)閉自動MAPJOIN轉(zhuǎn)換操作)
hive.ignore.mapjoin.hint=false (不忽略MAPJOIN標(biāo)記)
再提一句:將表放到Map端內(nèi)存時,如果節(jié)點(diǎn)的內(nèi)存很大,但還是出現(xiàn)內(nèi)存溢出的情況,我們可以通過這個參數(shù) mapreduce.map.memory.mb 調(diào)節(jié)Map端內(nèi)存的大小。
6. 確實(shí)無法減少數(shù)據(jù)量引發(fā)的數(shù)據(jù)傾斜
在一些操作中,我們沒有辦法減少數(shù)據(jù)量,如在使用 collect_list 函數(shù)時:
select s_age,collect_list(s_score) list_score from student group by s_age
collect_list:將分組中的某列轉(zhuǎn)為一個數(shù)組返回。
在上述sql中,s_age如果存在數(shù)據(jù)傾斜,當(dāng)數(shù)據(jù)量大到一定的數(shù)量,會導(dǎo)致處理傾斜的reduce任務(wù)產(chǎn)生內(nèi)存溢出的異常。
注:collect_list輸出一個數(shù)組,中間結(jié)果會放到內(nèi)存中,所以如果collect_list聚合太多數(shù)據(jù),會導(dǎo)致內(nèi)存溢出。
有小伙伴說這是 group by 分組引起的數(shù)據(jù)傾斜,可以開啟hive.groupby.skewindata參數(shù)來優(yōu)化。我們接下來分析下:
開啟該配置會將作業(yè)拆解成兩個作業(yè),第一個作業(yè)會盡可能將Map的數(shù)據(jù)平均分配到Reduce階段,并在這個階段實(shí)現(xiàn)數(shù)據(jù)的預(yù)聚合,以減少第二個作業(yè)處理的數(shù)據(jù)量;第二個作業(yè)在第一個作業(yè)處理的數(shù)據(jù)基礎(chǔ)上進(jìn)行結(jié)果的聚合。
hive.groupby.skewindata的核心作用在于生成的第一個作業(yè)能夠有效減少數(shù)量。但是對于collect_list這類要求全量操作所有數(shù)據(jù)的中間結(jié)果的函數(shù)來說,明顯起不到作用,反而因為引入新的作業(yè)增加了磁盤和網(wǎng)絡(luò)I/O的負(fù)擔(dān),而導(dǎo)致性能變得更為低下。
解決方案:
這類問題最直接的方式就是調(diào)整reduce所執(zhí)行的內(nèi)存大小。
調(diào)整reduce的內(nèi)存大小使用mapreduce.reduce.memory.mb這個配置。
原文標(biāo)題:Hive 千億級數(shù)據(jù)傾斜解決方案(好文收藏)
文章出處:【微信公眾號:數(shù)據(jù)分析與開發(fā)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
責(zé)任編輯:haq
-
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
7067瀏覽量
89108 -
內(nèi)存
+關(guān)注
關(guān)注
8文章
3028瀏覽量
74098
原文標(biāo)題:Hive 千億級數(shù)據(jù)傾斜解決方案(好文收藏)
文章出處:【微信號:DBDevs,微信公眾號:數(shù)據(jù)分析與開發(fā)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論