Shuffle核心概念、Shuffle調(diào)優(yōu)及故障排除
三、 SortShuffle解析
SortShuffleManager的運行機制主要分成兩種,一種是普通運行機制,另一種是bypass運行機制。當shuffle read task的數(shù)量小于等于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值時(默認為200),就會啟用bypass機制。
1. 普通運行機制
在該模式下,數(shù)據(jù)會先寫入一個內(nèi)存數(shù)據(jù)結(jié)構(gòu)中,此時根據(jù)不同的shuffle算子,可能選用不同的數(shù)據(jù)結(jié)構(gòu)。如果是reduceByKey這種聚合類的shuffle算子,那么會選用Map數(shù)據(jù)結(jié)構(gòu),一邊通過Map進行聚合,一邊寫入內(nèi)存;如果是join這種普通的shuffle算子,那么會選用Array數(shù)據(jù)結(jié)構(gòu),直接寫入內(nèi)存。接著,每寫一條數(shù)據(jù)進入內(nèi)存數(shù)據(jù)結(jié)構(gòu)之后,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那么就會嘗試將內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)溢寫到磁盤,然后清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)。
在溢寫到磁盤文件之前,會先根據(jù)key對內(nèi)存數(shù)據(jù)結(jié)構(gòu)中已有的數(shù)據(jù)進行排序。排序過后,會分批將數(shù)據(jù)寫入磁盤文件。默認的batch數(shù)量是10000條,也就是說,排序好的數(shù)據(jù),會以每批1萬條數(shù)據(jù)的形式分批寫入磁盤文件。寫入磁盤文件是通過Java的BufferedOutputStream實現(xiàn)的。BufferedOutputStream是Java的緩沖輸出流,首先會將數(shù)據(jù)緩沖在內(nèi)存中,當內(nèi)存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤IO次數(shù),提升性能。
一個task將所有數(shù)據(jù)寫入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過程中,會發(fā)生多次磁盤溢寫操作,也就會產(chǎn)生多個臨時文件。最后會將之前所有的臨時磁盤文件都進行合并,這就是merge過程,此時會將之前所有臨時磁盤文件中的數(shù)據(jù)讀取出來,然后依次寫入最終的磁盤文件之中。此外,由于一個task就只對應一個磁盤文件,也就意味著該task為下游stage的task準備的數(shù)據(jù)都在這一個文件中,因此還會單獨寫一份索引文件,其中標識了下游各個task的數(shù)據(jù)在文件中的start offset與end offset。
SortShuffleManager由于有一個磁盤文件merge的過程,因此大大減少了文件數(shù)量。比如第一個stage有50個task,總共有10個Executor,每個Executor執(zhí)行5個task,而第二個stage有100個task。由于每個task最終只有一個磁盤文件,因此此時每個Executor上只有5個磁盤文件,所有Executor只有50個磁盤文件。
普通運行機制的SortShuffleManager工作原理如下圖所示:
普通運行機制的SortShuffleManager工作原理2. bypass運行機制
bypass運行機制的觸發(fā)條件如下:
shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold=200參數(shù)的值。不是聚合類的shuffle算子。
此時,每個task會為每個下游task都創(chuàng)建一個臨時磁盤文件,并將數(shù)據(jù)按key進行hash然后根據(jù)key的hash值,將key寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內(nèi)存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合并成一個磁盤文件,并創(chuàng)建一個單獨的索引文件。
該過程的磁盤寫機制其實跟未經(jīng)優(yōu)化的HashShuffleManager是一模一樣的,因為都要創(chuàng)建數(shù)量驚人的磁盤文件,只是在最后會做一個磁盤文件的合并而已。因此少量的最終磁盤文件,也讓該機制相對未經(jīng)優(yōu)化的HashShuffleManager來說,shuffle read的性能會更好。
而該機制與普通SortShuffleManager運行機制的不同在于:第一,磁盤寫機制不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在于,shuffle write過程中,不需要進行數(shù)據(jù)的排序操作,也就節(jié)省掉了這部分的性能開銷。
bypass運行機制的SortShuffleManager工作原理如下圖所示:
bypass運行機制的SortShuffleManager工作原理
四、map和reduce端緩沖區(qū)大小
在Spark任務運行過程中,如果shuffle的map端處理的數(shù)據(jù)量比較大,但是map端緩沖的大小是固定的,可能會出現(xiàn)map端緩沖數(shù)據(jù)頻繁spill溢寫到磁盤文件中的情況,使得性能非常低下,通過調(diào)節(jié)map端緩沖的大小,可以避免頻繁的磁盤IO操作,進而提升Spark任務的整體性能。
map端緩沖的默認配置是32KB,如果每個task處理640KB的數(shù)據(jù),那么會發(fā)生640/32 = 20次溢寫,如果每個task處理64000KB的數(shù)據(jù),即會發(fā)生64000/32=2000次溢寫,這對于性能的影響是非常嚴重的。
map端緩沖的配置方法:
val conf = new SparkConf()
.set("spark.shuffle.file.buffer", "64")
Spark Shuffle過程中,shuffle reduce task的buffer緩沖區(qū)大小決定了reduce task每次能夠緩沖的數(shù)據(jù)量,也就是每次能夠拉取的數(shù)據(jù)量,如果內(nèi)存資源較為充足,適當增加拉取數(shù)據(jù)緩沖區(qū)的大小,可以減少拉取數(shù)據(jù)的次數(shù),也就可以減少網(wǎng)絡傳輸?shù)拇螖?shù),進而提升性能。
reduce端數(shù)據(jù)拉取緩沖區(qū)的大小可以通過spark.reducer.maxSizeInFlight參數(shù)進行設(shè)置,默認為48MB。該參數(shù)的設(shè)置方法如下:
reduce端數(shù)據(jù)拉取緩沖區(qū)配置:
val conf = new SparkConf()
.set("spark.reducer.maxSizeInFlight", "96")
五、reduce端重試次數(shù)和等待時間間隔
Spark Shuffle過程中,reduce task拉取屬于自己的數(shù)據(jù)時,如果因為網(wǎng)絡異常等原因?qū)е率詣舆M行重試。對于那些包含了特別耗時的shuffle操作的作業(yè),建議增加重試最大次數(shù)(比如60次),以避免由于JVM的full gc或者網(wǎng)絡不穩(wěn)定等因素導致的數(shù)據(jù)拉取失敗。在實踐中發(fā)現(xiàn),對于針對超大數(shù)據(jù)量(數(shù)十億~上百億)的shuffle過程,調(diào)節(jié)該參數(shù)可以大幅度提升穩(wěn)定性。
reduce端拉取數(shù)據(jù)重試次數(shù)可以通過spark.shuffle.io.maxRetries參數(shù)進行設(shè)置,該參數(shù)就代表了可以重試的最大次數(shù)。如果在指定次數(shù)之內(nèi)拉取還是沒有成功,就可能會導致作業(yè)執(zhí)行失敗,默認為3,該參數(shù)的設(shè)置方法如下:
reduce端拉取數(shù)據(jù)重試次數(shù)配置:
val conf = new SparkConf()
.set("spark.shuffle.io.maxRetries", "6")
Spark Shuffle過程中,reduce task拉取屬于自己的數(shù)據(jù)時,如果因為網(wǎng)絡異常等原因?qū)е率詣舆M行重試,在一次失敗后,會等待一定的時間間隔再進行重試,可以通過加大間隔時長(比如60s),以增加shuffle操作的穩(wěn)定性。
reduce端拉取數(shù)據(jù)等待間隔可以通過spark.shuffle.io.retryWait參數(shù)進行設(shè)置,默認值為5s,該參數(shù)的設(shè)置方法如下:
reduce端拉取數(shù)據(jù)等待間隔配置:
val conf = new SparkConf()
.set("spark.shuffle.io.retryWait", "60s")

最新活動更多
推薦專題
- 1 UALink規(guī)范發(fā)布:挑戰(zhàn)英偉達AI統(tǒng)治的開始
- 2 北電數(shù)智主辦酒仙橋論壇,探索AI產(chǎn)業(yè)發(fā)展新路徑
- 3 “AI寒武紀”爆發(fā)至今,五類新物種登上歷史舞臺
- 4 降薪、加班、裁員三重暴擊,“AI四小龍”已折戟兩家
- 5 國產(chǎn)智駕迎戰(zhàn)特斯拉FSD,AI含量差幾何?
- 6 光計算迎來商業(yè)化突破,但落地仍需時間
- 7 東陽光:2024年扭虧、一季度凈利大增,液冷疊加具身智能打開成長空間
- 8 地平線自動駕駛方案解讀
- 9 封殺AI“照騙”,“淘寶們”終于不忍了?
- 10 優(yōu)必選:營收大增主靠小件,虧損繼續(xù)又逢關(guān)稅,能否乘機器人東風翻身?