在 LinkedIn,我們非常依賴離線數(shù)據(jù)分析來進行數(shù)據(jù)驅(qū)動的決策。多年來,Apache Spark 已經(jīng)成為 LinkedIn 的主要計算引擎,以滿足這些數(shù)據(jù)需求。憑借其獨特的功能,Spark 為 LinkedIn 的許多關(guān)鍵業(yè)務(wù)提供支持,包括數(shù)據(jù)倉庫、數(shù)據(jù)科學(xué)、AI/ML、A/B 測試和指標報告。需要大規(guī)模數(shù)據(jù)分析的用例數(shù)量也在快速增長。從 2017 年到現(xiàn)在,LinkedIn 的 Spark 使用量同比增長了大約 3 倍。因此,LinkedIn 的 Spark 引擎現(xiàn)在運行在一個龐大的基礎(chǔ)設(shè)施之上。我們的生產(chǎn)集群有 10,000 多個節(jié)點,Spark 作業(yè)現(xiàn)在占集群計算資源使用量的 70% 以上,每日處理的數(shù)據(jù)量達數(shù)十 PB。解決擴展性挑戰(zhàn)以確保我們的 Spark 計算基礎(chǔ)設(shè)施可持續(xù)發(fā)展是 LinkedIn Spark 團隊工作的核心。
盡管 Apache Spark 有很多好處,這使得它在 LinkedIn 和整個行業(yè)中都很受歡迎,但在我們的數(shù)據(jù)規(guī)模下使用 Spark 時,我們?nèi)匀挥龅搅艘恍┨魬?zhàn)。正如我們在 Spark + AI Summit 2020 演講中所述,這些挑戰(zhàn)涉及多個層面,包括計算資源管理、計算引擎可擴展性和用戶生產(chǎn)率。我們將在這篇博文中關(guān)注 Spark shuffle 可擴展性挑戰(zhàn),并介紹 Magnet,一種新穎的基于推送模式的 shuffle 服務(wù)(push-based shuffle service)。
Shuffle 基礎(chǔ)知識
數(shù)據(jù) shuffle 是 MapReduce 計算范式中的一項重要操作,為 Apache Spark 和許多其他現(xiàn)代大數(shù)據(jù)計算引擎提供動力。shuffle 操作基本上是通過相應(yīng)階段的 map 和 reduce 任務(wù)之間的 all-to-all 連接來傳輸中間數(shù)據(jù)。通過 shuffle,數(shù)據(jù)將根據(jù)每個記錄的分區(qū)鍵中的值在所有的 shuffle 分區(qū)上進行適當?shù)姆謪^(qū)。如圖 1 所示:
雖然 shuffle 操作的基本概念很簡單,但不同的計算引擎采用了不同的方法來實現(xiàn)它。在 LinkedIn,我們在 Apache YARN 之上運行 Spark,并利用 Spark 的External Shuffle Service (ESS) 來進行 shuffle 操作。如圖 2 所示:
通過這種設(shè)置,計算集群中的每個節(jié)點都部署了一個 Spark ESS 實例。當 Spark executors 運行 shuffle 的 mapper 任務(wù)時,這些任務(wù)會在本地磁盤上生成 shuffle 文件。每個 shuffle 文件由多個 shuffle 塊組成,每個塊代表屬于相應(yīng) shuffle 分區(qū)的數(shù)據(jù)。生成這些 shuffle 文件后,本地 Spark ESS 實例知道在哪里可以找到這些 shuffle 文件以及不同 mapper 任務(wù)生成的各個 shuffle 塊。當 Spark 執(zhí)行器開始運行同一個 shuffle 的 reducer 任務(wù)時,這些任務(wù)將從 Spark driver 獲取有關(guān)在哪里可以找到 shuffle 塊的信息作為它們的任務(wù)輸入。然后這些 reducer 會向遠程 Spark ESS 實例發(fā)送請求,嘗試獲取它們對應(yīng)的 shuffle 塊。Spark ESS 收到此類請求后,將從本地磁盤讀取相應(yīng)的 shuffle 塊并將數(shù)據(jù)發(fā)送回 reducer。
挑戰(zhàn)
Spark 現(xiàn)有的 shuffle 機制在性能和容錯要求之間取得了很好的平衡。然而,當在我們的規(guī)模下運行 Spark shuffle 時,我們經(jīng)歷了多個挑戰(zhàn),這使得 shuffle 操作成為我們基礎(chǔ)設(shè)施中的擴展和性能瓶頸。
可靠性問題
第一個挑戰(zhàn)是可靠性問題。在我們的生產(chǎn)集群中,由于計算節(jié)點數(shù)據(jù)量大和 shuffle 工作負載的規(guī)模,我們注意到集群高峰時段的 shuffle 服務(wù)可用性問題。這可能會導(dǎo)致 shuffle fetch 失敗,從而導(dǎo)致昂貴的 stage 重試,這可能非常具有破壞性,因為它們會導(dǎo)致工作流 SLA 違規(guī)和作業(yè)失敗。圖 3 進一步說明了這一點,其中顯示了由于 shuffle 導(dǎo)致的每日 Spark stage 失敗的數(shù)量。到 2019 年底,這一趨勢越來越嚴重,每天有數(shù)百到一千多個 stage 因為這個失敗。
效率問題
第二個挑戰(zhàn)是效率問題。在 LinkedIn,我們將 shuffle 文件存儲在 HDD 上。由于 reducer 的 shuffle fetch 請求是隨機到達的,因此 shuffle 服務(wù)也會隨機訪問 shuffle 文件中的數(shù)據(jù)。如果單個 shuffle 塊大小較小,則 shuffle 服務(wù)產(chǎn)生的小隨機讀取會嚴重影響磁盤吞吐量,從而延長 shuffle fetch 等待時間。這也在下圖中進行了說明。正如下圖所示,在 2020 年 3 月至 8 月之間,我們生產(chǎn)集群的 10-20% 的計算資源浪費在 shuffle 上,在等待遠程 shuffle 數(shù)據(jù)時處于空閑狀態(tài)。
擴展性問題
第三個挑戰(zhàn)是擴展問題。由于 external shuffle service 是我們基礎(chǔ)架構(gòu)中的共享服務(wù),因此一些對 shuffle services 錯誤調(diào)優(yōu)的作業(yè)也會影響其他作業(yè)。當一個作業(yè)錯誤地配置導(dǎo)致產(chǎn)生許多小的 shuffle blocks 將會給 shuffle 服務(wù)帶來壓力時,它不僅會給自身帶來性能下降,還會使共享相同 shuffle 服務(wù)的所有相鄰作業(yè)的性能下降。這可能會導(dǎo)致原本正常運行的作業(yè)出現(xiàn)不可預(yù)測的運行時延遲,尤其是在集群高峰時段。
Magnet shuffle service
為了解決這些問題,我們設(shè)計并實現(xiàn)了 Magnet,這是一種新穎的基于推送(push-based)的 shuffle 服務(wù)。Magnet 項目在今年早些時候作為 VLDB 2020 上發(fā)表的工業(yè)跟蹤論文首次向社區(qū)亮相,您可以在此處閱讀我們的 VLDB 論文:Magnet: Push-based Shuffle Service for Large-scale Data Processing。最近,我們正在將 Magnet 回饋給 Apache Spark 社區(qū)。這篇博文的其余部分將介紹 Magnet 背后的高級設(shè)計及其在生產(chǎn)中的性能,但感興趣的讀者可以到 SPARK-30602 中獲取有關(guān)該工作的更新以及 SPIP 文檔以獲取實現(xiàn)級別的詳細信息。
Push-based shuffle
Magnet shuffle 服務(wù)背后的核心思想是基于推送的 shuffle 概念,其中 mapper 生成的 shuffle 塊也被推送到遠程 shuffle 服務(wù),以按每個 shuffle 分區(qū)進行合并。push-based shuffle 的 shuffle 寫入路徑如下圖所示:
在 map 任務(wù)生成其 shuffle 文件后,它準備將 shuffle 塊推送到遠程 ESS。它將 shuffle 文件中的連續(xù)塊組裝成 MB 大小的塊,并將該組數(shù)據(jù)推到相應(yīng)的 ESS。大于特定大小的 Shuffle 塊將被跳過,因此我們不會推送可能來自大型傾斜分區(qū)的塊。map 任務(wù)以一致的方式確定這個分組和相應(yīng)的 ESS 目的地,從而將屬于同一個 shuffle 分區(qū)的不同 mappers 的塊推到同一 ESS。分組完成后,這些塊的傳輸將移交給專用線程池,然后 map 任務(wù)就完成了。通過這種方式,我們將任務(wù)執(zhí)行線程與塊傳輸線程解耦,在 I/O 密集型數(shù)據(jù)傳輸和 CPU 密集型任務(wù)執(zhí)行之間實現(xiàn)了更好的并行性。ESS 接受遠程推送的 shuffle 塊,并將相同分區(qū)的 shuffle 數(shù)據(jù)合并到相應(yīng)的 shuffle 文件中。這是以盡力而為的方式完成的,這并不能保證所有塊都被合并。但是,ESS 確實保證在合并期間不會發(fā)生數(shù)據(jù)重復(fù)或損壞。
在基于推送的 shuffle 數(shù)據(jù)讀取路徑上,reduce 任務(wù)可以從合并的 shuffle 文件和 map 任務(wù)生成的原始 shuffle 文件中獲取其任務(wù)輸入(如上圖)。ESS 在從合并的 shuffle 文件中讀取時可以執(zhí)行大的順序 I/O 而不是小的隨機 I/O,從而顯著提高 I/O 效率。利用這一點,reduce 任務(wù)更愿意從合并的 shuffle 文件中獲取它們的輸入。由于塊推送/合并過程是盡力而為的,reduce 任務(wù)可以使用未合并的塊來填充合并的 shuffle 文件中的任何漏洞。如果合并的 shuffle 文件變得不可用,他們甚至可以完全回退到獲取未合并的塊。Magnet 的高效磁盤 I/O 模式進一步為構(gòu)建高性能 Spark 集群提供了更大的靈活性,因為它更少依賴 SSD 來實現(xiàn)良好的 shuffle 性能。
Spark driver 負責(zé)協(xié)調(diào) map 和 reduce 任務(wù)中基于推送的 shuffle。在 shuffle 寫入路徑上,Spark driver 為給定 shuffle 的 map 任務(wù)確定要使用的 ESS 列表。這個 ESS 列表作為任務(wù)上下文的一部分發(fā)送到 Spark executors,這使 map 任務(wù)能夠在塊組和遠程 ESS 目的地之間提出上述一致的映射。Spark 驅(qū)動程序進一步協(xié)調(diào) map 和 reduce 階段之間的轉(zhuǎn)換。一旦所有的 map 任務(wù)都完成了,Spark 驅(qū)動程序會等待一段可配置的時間,然后通知所有選擇的 ESS 進行這次 shuffle 以完成合并操作。當 ESS 收到終結(jié)請求時,它停止接受來自給定 shuffle 的任何新塊。它還向驅(qū)動程序響應(yīng)每個最終 shuffle 分區(qū)的元數(shù)據(jù)列表,其中包括有關(guān)合并的 shuffle 文件的位置和大小信息以及指示哪些塊已合并的位圖。一旦 Spark 驅(qū)動程序從所有 ESS 接收到此類元數(shù)據(jù),它就會啟動 reduce 階段。此時,Spark 驅(qū)動程序擁有 shuffle 數(shù)據(jù)位置的完整視圖,現(xiàn)在在合并的 shuffle 文件和原始 shuffle 文件之間進行了 2 次復(fù)制。Spark 驅(qū)動程序利用此信息來協(xié)調(diào) reduce 任務(wù)的輸入位置。此外,合并的 shuffle 文件的位置為 reduce 任務(wù)創(chuàng)建了自然的位置偏好。Spark 驅(qū)動程序利用該信息可以在存儲了 shuffle 信息的機器上啟動 reduce 任務(wù),如下圖所示:
push-based shuffle 的優(yōu)勢
Push-based shuffle 為 Spark shuffle 帶來了幾個關(guān)鍵好處。
提高磁盤 I/O 效率
使用 push-based shuffle,shuffle 服務(wù)在訪問 shuffle 文件中的 shuffle 數(shù)據(jù)時,從小的隨機讀取切換到大的順序讀取,顯著提高了磁盤 I/O 效率,特別是對于基于 HDD 的 shuffle 存儲。在 shuffle 寫路徑上,即使對小塊進行兩次 shuffle 數(shù)據(jù)寫入,整體的 I/O 效率還是有提升的。這是因為小的隨機寫入可以從多個級別的緩存中受益,例如操作系統(tǒng)頁面緩存和磁盤緩沖區(qū)。因此,小隨機寫入可以實現(xiàn)比小隨機讀取高得多的吞吐量。改進的磁盤 I/O 效率的效果反映在本博文后面顯示的性能數(shù)據(jù)中。關(guān)于 I/O 效率提升的更詳細分析包含在我們的 VLDB 論文中。
緩解 shuffle 的可靠性/可擴展性問題
Spark 原生 shuffle 操作要成功,需要每個 reduce 任務(wù)成功地從所有 map 任務(wù)中獲取每個相應(yīng)的 shuffle 塊,這在擁有數(shù)千個節(jié)點的繁忙集群中通常無法滿足。Magnet 通過多種方式實現(xiàn)了更好的 shuffle 可靠性:
?Magnet 采用盡力而為的方法來合并塊。塊推送/合并過程中的失敗不會影響 shuffle 的過程。
?通過基于推送的 shuffle,Magnet 有效地生成了 shuffle 中間數(shù)據(jù)的第二個副本。只有在無法從原始 shuffle 文件或合并的 shuffle 文件中獲取 shuffle 塊時,才會發(fā)生 shuffle fetch 失敗。
通過 reduce 任務(wù)的位置感知調(diào)度,它們通常在合并后的 shuffle 文件所在的機器上啟動,這允許它們繞過 ESS 讀取 shuffle 數(shù)據(jù)。這使得 reduce 任務(wù)對 ESS 可用性或性能問題更具彈性,從而緩解前面提到的可擴展性問題。
在塊推送過程中處理 stragglers
在 Spark 的普通 shuffle 操作中,由于多個任務(wù)通常是并發(fā)運行的,任務(wù)中的掉隊者(即一些任務(wù)運行速度明顯慢于其他任務(wù))的影響可以被其他任務(wù)隱藏。使用基于推送的 shuffle,如果在塊推送操作中有任何落后者,它可能會暫停執(zhí)行很長時間的作業(yè)。這是因為塊推送操作介于 shuffle map 和 reduce 階段之間。當存在落后者時,可能根本沒有任務(wù)在運行。然而,通過提前終止技術(shù),Magnet 可以在塊推送過程中有效地處理掉隊者。Magnet 不是等待 push 過程完全完成,而是限制它在 shuffle map 和 reduce 階段之間等待的時間。Magnet 的盡力而為的特性使其能夠容忍由于提前終止而未合并的塊。如下圖所示:
與 Spark 原生集成
Magnet 與 Spark 原生集成,這帶來了多種好處:
?Magnet 不依賴于其他外部系統(tǒng)。這有助于簡化 Magnet shuffle 服務(wù)的部署、監(jiān)控和生產(chǎn)。
?通過與 Spark 的 shuffle 系統(tǒng)的原生集成,Magnet shuffle 服務(wù)中的元數(shù)據(jù)可以暴露給 Spark 驅(qū)動程序。這使 Spark 驅(qū)動程序能夠?qū)崿F(xiàn)更好的性能(通過任務(wù)的位置感知調(diào)度)和更好的容錯(通過回退到原始 shuffle 塊)。
?Magnet 與現(xiàn)有的 Spark 功能(例如自適應(yīng)查詢執(zhí)行)配合得很好。Spark AQE 的承諾之一是能夠動態(tài)優(yōu)化 skew join,這也需要對 shuffle 進行特殊處理。Spark AQE 會在多個 reducer 任務(wù)之間劃分一個傾斜的 shuffle 分區(qū),每個任務(wù)只從 mapper 任務(wù)的一個子范圍中獲取 shuffle 塊。由于合并后的 shuffle 文件不再保持每個單獨的 shuffle 塊的原始邊界,因此無法按照 Spark AQE 要求的方式劃分合并后的 shuffle 文件。由于 Magnet 保留原始 shuffle 文件和合并后的 shuffle 文件,因此它可以委托 AQE 處理偏斜分區(qū),同時優(yōu)化非偏斜分區(qū)的 shuffle 操作。
性能對比
我們在 LinkedIn 上使用真實的生產(chǎn)作業(yè)評估了 Magnet 的性能,我們看到了非常不錯的結(jié)果。在下表中,我們展示了運行一個 ML 特征生成作業(yè)的性能結(jié)果,該作業(yè)具有數(shù)十個 shuffle 階段和接近 2 TB 的 shuffle 數(shù)據(jù)。與 Spark 中的原生 shuffle 相比,Magnet 取得了非常好的性能結(jié)果。請注意,Magnet 將 shuffle fetch 等待時間減少了 98%。這可以通過 Magnet 的高效隨機磁盤 I/O 和減少任務(wù)的位置感知調(diào)度來實現(xiàn)。此外,這個作業(yè)并不完全使用 Spark SQL,因為它混合使用了 Spark SQL 和非 SQL 代碼組成其計算邏輯。在優(yōu)化 shuffle 操作時,Magnet 只承擔(dān)很少的工作本身。
Total shuffle fetch wait time (min)Total executor task runtime (min)End-to-end job runtime (min)
Vanilla Spark shuffle206365077142
Magnet shuffle445 (-98%)29928 (-41%)31 (-26%)
我們還在 LinkedIn 引入了含有大量 shuffle 的作業(yè)。我們的一個生產(chǎn)集群中估計有 15% 的 shuffle 工作負載已遷移到 Magnet。在這些作業(yè)中,我們看到 shuffle fetch 等待時間、任務(wù)總運行時間和作業(yè)端到端運行時間也相應(yīng)的減少。如下圖所示,啟用 Magnet 的 Spark 作業(yè)平均減少了 3-4 倍的隨機提取等待時間。此外,我們已經(jīng)看到本地訪問的 shuffle 數(shù)據(jù)量增加了大約 10 倍,這表明基于推送的 shuffle 大大改善了數(shù)據(jù)局部性。最后,我們已經(jīng)看到作業(yè)運行時間在集群高峰時段變得更加穩(wěn)定。隨著我們加入更多的作業(yè),Magnet 將更多的 shuffle 工作負載轉(zhuǎn)換為優(yōu)化路徑,從而減輕了 shuffle 服務(wù)的壓力,并為集群帶來更多好處。另一方面,Magnet 可能會將 shuffle 臨時存儲需求加倍。我們正在通過為 shuffle 文件切換到 zstd 壓縮編解碼器來緩解這種情況,與默認壓縮編解碼器相比,這有可能將 shuffle 文件大小減少 50%。
結(jié)論和未來工作
在這篇博文中,我們介紹了 Magnet shuffle 服務(wù),這是 Apache Spark 的下一代 shuffle 架構(gòu)。Magnet 提高了 Spark 中 shuffle 操作的整體效率、可靠性和可擴展性。最近,我們也看到了業(yè)界針對 shuffle 過程提出的其他解決方案,比如 Cosco、Riffle、Zeus 和 Sailfish。我們在 VLDB 論文中對 Magnet 和其他這些解決方案進行了比較,尤其是 Cosco、Riffle 和 Sailfish。
未來,我們還將考慮在其他部署環(huán)境和計算引擎中提供基于 Magnet 推送的 shuffle。我們當前的集群部署模式為存儲和計算是在一起的。隨著 LinkedIn 正在向 Azure 遷移,我們也在評估計算和存儲分離的集群中基于推送的 shuffle 的方法。此外,我們目前基于推送的 shuffle 的設(shè)計主要針對批處理引擎,我們也在考慮它對流引擎的適用性。
感謝
需要一個專門的團隊才能將 Magnet 規(guī)模的項目帶來曙光。除了 Min Shen、Ye Zhou 和 Chandni Singh 的努力外,該項目還得到了 Venkata Krishnan Sowrirajan 和 Mridul Muralidharan 的重大貢獻。Erik Krogen、Ron Hu、Minchu Yang 和 Zoe Lin 為 Magnet 的生產(chǎn)部署和可觀察性改進做出了貢獻。特別感謝 Yuval Degani 構(gòu)建的 GridBench——這個工具使得理解各種因素對作業(yè)運行時的影響變得非常容易。特別感謝我們的合作伙伴團隊,尤其是 Jan Bob 和 Qun Li 的團隊,他們是 Magnet 的早期使用者。
像 Magnet 這樣的大型基礎(chǔ)設(shè)施工作需要管理層做出重大而持續(xù)的支持。Sunitha Beeram、Zhe Zhang、Vasanth Rajamani、Eric Baldeschwieler、Kapil Surlakar 和 Igor Perisic:感謝您的堅定支持和指導(dǎo)。Magnet 的設(shè)計也得益于與 Sriram Rao 和 Shirshanka Das 的評論和深入討論。
Magnet 得到 Apache Spark 社區(qū)的大力支持。感謝與 Databricks 的合作以及來自眾多社區(qū)成員的評論。
責(zé)任編輯:haq
-
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
7004瀏覽量
88944 -
Shuffle
+關(guān)注
關(guān)注
0文章
5瀏覽量
1685
原文標題:Magnet:即將隨 Apache Spark 3.2 發(fā)布的高性能外部 Shuffle 服務(wù)
文章出處:【微信號:DBDevs,微信公眾號:數(shù)據(jù)分析與開發(fā)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論