一、數據傾斜的基本概念
01 什么是數據傾斜?
用最通俗易懂的話來說,數據傾斜無非就是大量的相同key被partition分配到一個分區里,造成了'一個人累死,其他人閑死'的情況,這種情況是我們不能接受的,這也違背了并行計算的初衷,首先一個節點要承受著巨大的壓力,而其他節點計算完畢后要一直等待這個忙碌的節點,也拖累了整體的計算時間,可以說效率是十分低下的。
02? 數據傾斜發生時的現象?
(1)絕大多數task執行得都非常快,但個別task執行的極慢。
(2)原本能正常執行的Spark作業,某天突然爆出OOM(內存溢出)異常。觀察異常棧,是我們寫的業務代碼造成的。
03 通用的常規解決方案
(1)增加jvm內存,這適用于第一種情況(唯一值非常少,極少數值有非常多的記錄值(唯一值少于幾千)),這種情況下,往往只能通過硬件的手段來進行調優,增加jvm內存可以顯著的提高運行效率。
(2)增加reduce的個數,這適用于第二種情況(唯一值比較多,這個字段的某些值有遠遠多于其他值的記錄數,但是它的占比也小于百分之一或千分之一),我們知道,這種情況下,最容易造成的結果就是大量相同key被partition到一個分區,從而一個reduce執行了大量的工作,而如果我們增加了reduce的個數,這種情況相對來說會減輕很多,畢竟計算的節點多了,就算工作量還是不均勻的,那也要小很多。
(3)自定義分區,這需要用戶自己繼承partition類,指定分區策略,這種方式效果比較顯著。
(4)重新設計key,有一種方案是在map階段時給key加上一個隨機數,有了隨機數的key就不會被大量的分配到同一節點(小幾率),待到reduce后再把隨機數去掉即可。
(5)使用combinner合并,combinner是在map階段,reduce之前的一個中間階段,在這個階段可以選擇性的把大量的相同key數據先進行一個合并,可以看做是local reduce,然后再交給reduce來處理,這樣做的好。
04 通用定位發生數據傾斜的代碼
(1)數據傾斜只會發生在shuffle中,下面是常用的可能會觸發shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出現數據傾斜時,可能就是代碼中使用了這些算子的原因。
(2)通過觀察spark UI,定位數據傾斜發生在第幾個stage中,如果是用yarn-client模式提交,那么本地是可以直接看到log的,可以在log中找到當前運行到了第幾個stage;如果用yarn-cluster模式提交,可以通過Spark Web UI 來查看當前運行到了第幾個stage。此外,無論是使用了yarn-client模式還是yarn-cluster模式,我們都可以在Spark Web UI 上深入看一下當前這個stage各個task分配的數據量,從而進一步確定是不是task分配的數據不均勻導致了數據傾斜。
二、 Hive數據傾斜
1、Hive的執行是分階段的,map處理數據量的差異取決于上一個stage的reduce輸出,所以如何將數據均勻的分配到各個reduce中,就是解決數據傾斜的根本所在。
2 、造成數據傾斜的原因
1)、key分布不均勻
2)、業務數據本身的特性
3)、建表時考慮不周
4)、某些SQL語句本身就有數據傾斜
3 、數據傾斜的表現:
數據傾斜出現在SQL算子中包含join/group by/等聚合操作時,大量的相同KEY被分配到少量的reduce去處理。導致絕大多數TASK執行得都非常快,但個別TASK執行的極慢,原本能正常執行的作業,某天突然爆出OOM(內存溢出)異常。任務進度長時間維持在99%(或100%)。任務監控頁面,發現只有少量(1個或幾個)reduce子任務未完成。因為其處理的數據量和其他reduce差異過大。單一reduce的記錄數與平均記錄數差異過大,通常可能達到3倍甚至更多。 最長時長遠大于平均時長。可以查看具體job的reducer counter計數器協助定位。
4、數據傾斜的解決方案:
1)參數調節:
hive.map.aggr=true(是否在Map端進行聚合,默認為true),這個設置可以將頂層的聚合操作放在Map階段執行,從而減輕清洗階段數據傳輸和Reduce階段的執行時間,提升總體性能 Set hive.groupby.skewindata=true(hive自動進行負載均衡)
2)SQL語句調節
a、如何Join: 關于驅動表的選取,選用join key分布最均勻的表作為驅動表。 做好列裁剪和filter操作,以達到兩表做join的時候,數據量相對變小的效果,避免笛卡爾積。 Hive中進行表的關聯查詢時,盡可能將較大的表放在Join之后。
b、大小表Join,開啟mapjoin
mapjoin的原理: MapJoin 會把小表全部讀入內存中,在map階段直接拿另外一個表的數據和內存中表數據做匹配,由于在map是進行了join操作,省去了reduce 階段,運行的效率就會高很多。參與連接的小表的行數,以不超過2萬條為宜,大小不超過25M。
設置參數
set hive.auto.convert.join=true; hive.mapjoin.smalltable.filesize=25000000( 即25M)?手動指定
-- a 表是大表,數據量是百萬級別
-- b 表是小表,數據量在百級別,mapjion括號中的b就是指定哪張表為小表
select /*+mapjoin(b)*/ a.field1asfield1, b.field2asfield2, b.field3asfield3 fromaleftjoinb on a.field1 = b.field1;c、大表Join大表:
null值不參與連接,簡單舉例
select field1,field2,field3… fromlogaleftjoinuserbona.useridisnotnullanda.userid=b.userid unionselectfield1,field2,field3fromlogwhereuseridisnull;
將熱點key打散,但是需要注意,盡量不要在join時,對關聯key使用rand()函數。因為在hive中當遇到map失敗重算時,就會出現數據重復(數據丟失)的問題,spark引擎使用rand容易導致task失敗重新計算的時候偶發不一致的問題。可以使用md5加密唯一維度值的方式替代rand(), 比如: md5(concat(coalesce(sku_id, 0), '_', coalesce(dim_store_num, 0), '_', coalesce(store_id, 0), '_',coalesce(delv_center_id, 0))),其中concat的字段是表的唯一粒度;也可以使用hash。
d、count distinct大量相同特殊值,使用sum...group by代替count(distinct ) 例如
selecta,count(distinctb)fromtgroupbya 可以寫成selecta,sum(1)from(selecta,bfromtgroupbya,b)groupbya;
select count (distinct key) from a 可以寫成 Select sum(1) from (Select key from a group by key) t特殊情況特殊處理:在業務邏輯優化效果的不大情況下,有些時候是可以將傾斜的數據單獨拿出來處理。最后union回去
e、 不管是join還是groupby 請先在內層先進行數據過濾,建議只保留需要的key值
f、 取最大最小值盡量使用min/max;不要采用row_number
g、 不要直接select * ;在內層做好數據過濾
h、 盡量使用sort by替換order by
i、 明確數據源,有上層匯總的就不要使用基礎fdm或明細表
J、join避免多對多關聯
在join鏈接查詢時,確認是否存在多對多的關聯,起碼保證有一個表的結果集的關聯字段不重復。
5、典型的業務場景舉例
(1)空值產生的數據傾斜
場景:如日志中,常會有信息丟失的問題,比如日志中的 user_id,如果取其中的 user_id 和 用戶表中的user_id 關聯,會碰到數據傾斜的問題。
解決方法1: user_id為空的不參與關聯
select * from log a join users b on a.user_id is not null and a.user_id = b.user_idunion allselect * from log a where a.user_id is null;(2)不同數據類型關聯產生數據傾斜
場景:用戶表中user_id字段為int,log表中user_id字段既有string類型也有int類型。當按照user_id進行兩個表的Join操作時,默認的Hash操作會按int型的id來進行分配,這樣會導致所有string類型id的記錄都分配到一個Reducer中。
解決方法:把數字類型轉換成字符串類型
select * from users a left outer join logs b on a.usr_id = cast(b.user_id as string)(3)小表不小不大,怎么用 map join 解決傾斜問題
使用 map join 解決小表(記錄數少)關聯大表的數據傾斜問題,這個方法使用的頻率非常高,但如果小表很大,大到map join會出現bug或異常,這時就需要特別的處理 。
select * from log a left outer join users b on a.user_id = b.user_id;users 表有 600w+ 的記錄,把 users 分發到所有的 map 上也是個不小的開銷,而且 map join 不支持這么大的小表。如果用普通的 join,又會碰到數據傾斜的問題。 解決方法:
select /*+mapjoin(x)*/* from log a left outer join ( select /*+mapjoin(c)*/d.* from ( select distinct user_id from log ) c join users d on c.user_id = d.user_id ) x on a.user_id = b.user_id;log里user_id有上百萬個,這就又回到原來map join問題。所幸,每日的會員uv不會太多,有交易的會員不會太多,有點擊的會員不會太多,有傭金的會員不會太多等等。所以這個方法能解決很多場景下的數據傾斜問題。
(4)業務邏輯突發熱key的處理(真實線上問題) 業務場景舉例:
流量數據多個設備號對應了一個安裝id,突發某幾個安裝id數量級特別大。在歸一環節中,按照安裝id進行分發reduce,再進行處理,異常熱key會造成單一節點處理數據量大,由于數據傾斜從而導致任務卡死的情況。
解決方案:基于小時任務,提前設置一個異常范圍,把異常安裝id和對應的aid撈出來,寫到維表里面。按照歸一邏輯,優先使用aid值作為歸一結果,所以在歸一任務中,讀取異常值,隨機分發到reduce中,并將aid賦值給歸一字段,這樣就避免了熱點處理。
總結:
1、對于join,在判斷小表不大于1G的情況下,使用map join
2、對于group by或distinct,設定 hive.groupby.skewindata=true
3、盡量使用上述的SQL語句調節進行優化
6、數據傾斜的監控預防
(1)測試的時候需要關注數據分布,針對不同日期、關鍵指標、重點key、枚舉值等
(2)增加數據質量監控,數據計算的每層任務增加數據質量監控。
(3)L0任務,大數據平臺需要有健康度巡檢,對資源、參數配置,數據傾斜、穩定性等做任務健康度打分,從而發現數據傾斜的趨勢,及早檢查任務
三、spark數據傾斜
Spark優化數據傾斜的思路,join方式從SMJ方式改成BMJ的方式,但是只適合大小表的情況。優化思路一般是: 改join方式,開啟spark自適應框架,優化sql。
1、開啟sparksql的數據傾斜時的自適應關聯優化
spark.shuffle.statistics.verbose=true打開后MapStatus會采集每個partition條數的信息,用于傾斜處理。
2 、Sortmergejoin 改成 BroadcastHashJoin。調大BroadcastHashJoin的閾值。
在某些場景下可以把SortMergeJoin轉化成BroadcastHashJoin而避免shuffle產生的數據傾斜。 增加參數:
spark.sql.autoBroadcastJoinThreshold=524288000將BHJ的閾值提高到500M
3、優化sql同hive
4、傾斜KEY查找
需要結合實際業務代碼,查找到引起Shuffle的算子,并按照以下兩種方式查找大KEY。?
方式一:通過SQL抽樣傾斜KEY
適用場景:如果數據量比較小的情況下,通過SQL的方式驗證比較便捷 。
操作步驟:
1、針對KEY進行數量統計
2、按照數量從大到小進行排序
3、直接取 limit N 即可?
方式二:通過sample抽樣傾斜KEY
適用場景:如果數據量很大,可以通過抽樣進行抽取大KEY。能否抽取到大KEY一般和抽取數據比例有關系。
操作步驟:
1、對KEY賦值為1,便于下一步進行計數
2、對KEY進行累計
3、對KEY和VALUE交換
4、針對KEY按照字典進行倒排
5、將KEY和VAlUE位置交換,還原到真實的
6、從已排序的RDD中,直接取前N條
數據傾斜一般由Shuffle時數據不均勻導致,一般有三類算子會產生Shuffle:Aggregation (groupBy)、Join、Window。 01 Aggregation
建議打散key進行二次聚合:采用對 非constant值、與key無關 的列進行hash取模,不要使用rand類函數。
dataframe .groupBy(col("key"),pmod(hash(col("some_col")),100)).agg(max("value").as("partial_max")) .groupBy(col("key")).agg(max("partial_max").as("max"))02? Window
目前支持該模式下的傾斜window,(僅支持3.0)
select (... row_number() over(partition by ... order by ...) as rn) wherern[==|<=|<]?k?and?other?conditionsspark.sql.rankLimit.enabled=true?(目前支持基于row_number的topK計算邏輯)03? Shuffled Join
Spark 2.4開啟參數
spark.sql.adaptive.enabled=true spark.shuffle.statistics.verbose=true spark.sql.adaptive.skewedJoin.enabled=true spark.sql.adaptive.allowAdditionalShuffle=true如果不能處理,建議用戶自行定位熱點數據進行處理 Spark 3.0
spark.sql.adaptive.enabled=true spark.sql.adaptive.skewJoin.enabled=true spark.sql.adaptive.skewJoin.enhance.enabled=true (通用傾斜算法,可處理更多場景) spark.sql.adaptive.forceOptimizeSkewedJoin=true(允許插入額外shuffle,可處理更多場景)
其他參數:
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (默認為256MB,分區大小超過該閾值才可被識別為傾斜分區,如果希望調整的傾斜分區小于該閾值,可以酌情調小)?
spark.sql.adaptive.skewJoin.skewedPartitionFactor (默認為5,分區大小超過中位數Xfactor才可被識別為傾斜分區,一般不需要調整)? spark.sql.adaptive.skewJoin.enhance.maxJoins (默認5,通用傾斜算法中,如果shuffled join超過此閾值則不處理,一般不需要調整)? spark.sql.adaptive.skewJoin.enhance.maxSplitsPerPartition (默認1000,通用傾斜算法中,盡量使得每個傾斜分區的劃分不超過該閾值,一般不需要調整)?
04 數據膨脹(Join)
spark.sql.adaptive.skewJoin.inflation.enabled=true(默認false,由于采樣計算會導致性能回歸,正常任務不要開啟) spark.sql.adaptive.skewJoin.inflation.factor=50(默認為100,預估的分區輸出大小超過中位數Xfactor才可被識別為膨脹分區,由于預估算法存在誤差,一般不要低于50) spark.sql.adaptive.shuffle.sampleSizePerPartition=500(默認100,每個Task中的采樣數,基于該采樣數據預估Join之后的分區大小,如果Task數量不大,可以酌情調大)05 傾斜key檢測(Join)
由于Join語義限制,對于A left join skewed B之類的場景,無法對B進行劃分處理,否則會導致數據正確性問題,這也是Spark項目所面臨的難題。如果開啟以上功能依然不能處理數據傾斜,可以通過開啟傾斜key檢測功能來定位是哪些key導致了傾斜或膨脹,繼而進行過濾等處理。
spark.sql.adaptive.shuffle.detectSkewness=true(默認false,由于采樣計算會導致性能回歸,正常任務不要開啟)其他參數:
spark.sql.adaptive.shuffle.sampleSizePerPartition=100(默認100,每個Task中的采樣數,如果Task數量不大,可以酌情調大)
審核編輯機:劉清
-
計數器
+關注
關注
32文章
2256瀏覽量
94485 -
SQL
+關注
關注
1文章
762瀏覽量
44117 -
RDD
+關注
關注
0文章
7瀏覽量
7972 -
JVM
+關注
關注
0文章
158瀏覽量
12220
原文標題:淺談離線數據傾斜
文章出處:【微信號:OSC開源社區,微信公眾號:OSC開源社區】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論