Spark調(diào)優(yōu)之RDD算子調(diào)優(yōu)(面試常問,建議收藏)
5. filter+coalesce/repartition(減少分區(qū))
在Spark任務(wù)中我們經(jīng)常會使用filter算子完成RDD中數(shù)據(jù)的過濾,在任務(wù)初始階段,從各個分區(qū)中加載到的數(shù)據(jù)量是相近的,但是一旦進過filter過濾后,每個分區(qū)的數(shù)據(jù)量有可能會存在較大差異,如下圖所示:
分區(qū)數(shù)據(jù)過濾結(jié)果
根據(jù)上圖我們可以發(fā)現(xiàn)兩個問題:
每個partition的數(shù)據(jù)量變小了,如果還按照之前與partition相等的task個數(shù)去處理當(dāng)前數(shù)據(jù),有點浪費task的計算資源;
每個partition的數(shù)據(jù)量不一樣,會導(dǎo)致后面的每個task處理每個partition數(shù)據(jù)的時候,每個task要處理的數(shù)據(jù)量不同,這很有可能導(dǎo)致數(shù)據(jù)傾斜問題。
如上圖所示,第二個分區(qū)的數(shù)據(jù)過濾后只剩100條,而第三個分區(qū)的數(shù)據(jù)過濾后剩下800條,在相同的處理邏輯下,第二個分區(qū)對應(yīng)的task處理的數(shù)據(jù)量與第三個分區(qū)對應(yīng)的task處理的數(shù)據(jù)量差距達到了8倍,這也會導(dǎo)致運行速度可能存在數(shù)倍的差距,這也就是數(shù)據(jù)傾斜問題。
針對上述的兩個問題,我們分別進行分析:
針對第一個問題,既然分區(qū)的數(shù)據(jù)量變小了,我們希望可以對分區(qū)數(shù)據(jù)進行重新分配,比如將原來4個分區(qū)的數(shù)據(jù)轉(zhuǎn)化到2個分區(qū)中,這樣只需要用后面的兩個task進行處理即可,避免了資源的浪費。
針對第二個問題,解決方法和第一個問題的解決方法非常相似,對分區(qū)數(shù)據(jù)重新分配,讓每個partition中的數(shù)據(jù)量差不多,這就避免了數(shù)據(jù)傾斜問題。
那么具體應(yīng)該如何實現(xiàn)上面的解決思路?我們需要coalesce算子。
repartition與coalesce都可以用來進行重分區(qū),其中repartition只是coalesce接口中shuffle為true的簡易實現(xiàn),coalesce默認情況下不進行shuffle,但是可以通過參數(shù)進行設(shè)置。
假設(shè)我們希望將原本的分區(qū)個數(shù)A通過重新分區(qū)變?yōu)锽,那么有以下幾種情況:
A > B(多數(shù)分區(qū)合并為少數(shù)分區(qū))
A與B相差值不大
此時使用coalesce即可,無需shuffle過程。
A與B相差值很大
此時可以使用coalesce并且不啟用shuffle過程,但是會導(dǎo)致合并過程性能低下,所以推薦設(shè)置coalesce的第二個參數(shù)為true,即啟動shuffle過程。
A < B(少數(shù)分區(qū)分解為多數(shù)分區(qū))
此時使用repartition即可,如果使用coalesce需要將shuffle設(shè)置為true,否則coalesce無效。
我們可以在filter操作之后,使用coalesce算子針對每個partition的數(shù)據(jù)量各不相同的情況,壓縮partition的數(shù)量,而且讓每個partition的數(shù)據(jù)量盡量均勻緊湊,以便于后面的task進行計算操作,在某種程度上能夠在一定程度上提升性能。
注意:local模式是進程內(nèi)模擬集群運行,已經(jīng)對并行度和分區(qū)數(shù)量有了一定的內(nèi)部優(yōu)化,因此不用去設(shè)置并行度和分區(qū)數(shù)量。
6. 并行度設(shè)置
Spark作業(yè)中的并行度指各個stage的task的數(shù)量。
如果并行度設(shè)置不合理而導(dǎo)致并行度過低,會導(dǎo)致資源的極大浪費,例如,20個Executor,每個Executor分配3個CPU core,而Spark作業(yè)有40個task,這樣每個Executor分配到的task個數(shù)是2個,這就使得每個Executor有一個CPU core空閑,導(dǎo)致資源的浪費。
理想的并行度設(shè)置,應(yīng)該是讓并行度與資源相匹配,簡單來說就是在資源允許的前提下,并行度要設(shè)置的盡可能大,達到可以充分利用集群資源。合理的設(shè)置并行度,可以提升整個Spark作業(yè)的性能和運行速度。
Spark官方推薦,task數(shù)量應(yīng)該設(shè)置為Spark作業(yè)總CPU core數(shù)量的2~3倍。之所以沒有推薦task數(shù)量與CPU core總數(shù)相等,是因為task的執(zhí)行時間不同,有的task執(zhí)行速度快而有的task執(zhí)行速度慢,如果task數(shù)量與CPU core總數(shù)相等,那么執(zhí)行快的task執(zhí)行完成后,會出現(xiàn)CPU core空閑的情況。如果task數(shù)量設(shè)置為CPU core總數(shù)的2~3倍,那么一個task執(zhí)行完畢后,CPU core會立刻執(zhí)行下一個task,降低了資源的浪費,同時提升了Spark作業(yè)運行的效率。
Spark作業(yè)并行度的設(shè)置如下:
val conf = new SparkConf().set("spark.default.parallelism", "500")
原則:讓 cpu 的 core(cpu 核心數(shù)) 充分利用起來, 如有100個 core,那么并行度可以設(shè)置為200~300。
7. repartition/coalesce調(diào)節(jié)并行度
Spark 中雖然可以設(shè)置并行度的調(diào)節(jié)策略,但是,并行度的設(shè)置對于Spark SQL是不生效的,用戶設(shè)置的并行度只對于Spark SQL以外的所有Spark的stage生效。
Spark SQL的并行度不允許用戶自己指定,Spark SQL自己會默認根據(jù)hive表對應(yīng)的HDFS文件的split個數(shù)自動設(shè)置Spark SQL所在的那個stage的并行度,用戶自己通 spark.default.parallelism 參數(shù)指定的并行度,只會在沒Spark SQL的stage中生效。
由于Spark SQL所在stage的并行度無法手動設(shè)置,如果數(shù)據(jù)量較大,并且此stage中后續(xù)的transformation操作有著復(fù)雜的業(yè)務(wù)邏輯,而Spark SQL自動設(shè)置的task數(shù)量很少,這就意味著每個task要處理為數(shù)不少的數(shù)據(jù)量,然后還要執(zhí)行非常復(fù)雜的處理邏輯,這就可能表現(xiàn)為第一個有Spark SQL的stage速度很慢,而后續(xù)的沒有Spark SQL的stage運行速度非?臁
為了解決Spark SQL無法設(shè)置并行度和task數(shù)量的問題,我們可以使用repartition算子。
repartition 算子使用前后對比圖如下:
repartition 算子使用前后對比圖
Spark SQL這一步的并行度和task數(shù)量肯定是沒有辦法去改變了,但是,對于Spark SQL查詢出來的RDD,立即使用repartition算子,去重新進行分區(qū),這樣可以重新分區(qū)為多個partition,從repartition之后的RDD操作,由于不再涉及Spark SQL,因此stage的并行度就會等于你手動設(shè)置的值,這樣就避免了Spark SQL所在的stage只能用少量的task去處理大量數(shù)據(jù)并執(zhí)行復(fù)雜的算法邏輯。使用repartition算子的前后對比如上圖所示。
8. reduceByKey本地預(yù)聚合
reduceByKey相較于普通的shuffle操作一個顯著的特點就是會進行map端的本地聚合,map端會先對本地的數(shù)據(jù)進行combine操作,然后將數(shù)據(jù)寫入給下個stage的每個task創(chuàng)建的文件中,也就是在map端,對每一個key對應(yīng)的value,執(zhí)行reduceByKey算子函數(shù)。
reduceByKey算子的執(zhí)行過程如下圖所示:
reduceByKey 算子執(zhí)行過程
使用reduceByKey對性能的提升如下:
本地聚合后,在map端的數(shù)據(jù)量變少,減少了磁盤IO,也減少了對磁盤空間的占用;
本地聚合后,下一個stage拉取的數(shù)據(jù)量變少,減少了網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量;
本地聚合后,在reduce端進行數(shù)據(jù)緩存的內(nèi)存占用減少;
本地聚合后,在reduce端進行聚合的數(shù)據(jù)量減少。
基于reduceByKey的本地聚合特征,我們應(yīng)該考慮使用reduceByKey代替其他的shuffle算子,例如groupByKey。
groupByKey與reduceByKey的運行原理如下圖1和圖2所示:
圖1:groupByKey原理
圖2:reduceByKey原理
根據(jù)上圖可知,groupByKey不會進行map端的聚合,而是將所有map端的數(shù)據(jù)shuffle到reduce端,然后在reduce端進行數(shù)據(jù)的聚合操作。由于reduceByKey有map端聚合的特性,使得網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量減小,因此效率要明顯高于groupByKey。
9. 使用持久化+checkpoint
Spark持久化在大部分情況下是沒有問題的,但是有時數(shù)據(jù)可能會丟失,如果數(shù)據(jù)一旦丟失,就需要對丟失的數(shù)據(jù)重新進行計算,計算完后再緩存和使用,為了避免數(shù)據(jù)的丟失,可以選擇對這個RDD進行checkpoint,也就是將數(shù)據(jù)持久化一份到容錯的文件系統(tǒng)上(比如HDFS)。
一個RDD緩存并checkpoint后,如果一旦發(fā)現(xiàn)緩存丟失,就會優(yōu)先查看checkpoint數(shù)據(jù)存不存在,如果有,就會使用checkpoint數(shù)據(jù),而不用重新計算。也即是說,checkpoint可以視為cache的保障機制,如果cache失敗,就使用checkpoint的數(shù)據(jù)。
使用checkpoint的優(yōu)點在于提高了Spark作業(yè)的可靠性,一旦緩存出現(xiàn)問題,不需要重新計算數(shù)據(jù),缺點在于,checkpoint時需要將數(shù)據(jù)寫入HDFS等文件系統(tǒng),對性能的消耗較大。
持久化設(shè)置如下:
sc.setCheckpointDir(‘HDFS’)
rdd.cache/persist(memory_and_disk)
rdd.checkpoint
10. 使用廣播變量
默認情況下,task中的算子中如果使用了外部的變量,每個task都會獲取一份變量的復(fù)本,這就造成了內(nèi)存的極大消耗。一方面,如果后續(xù)對RDD進行持久化,可能就無法將RDD數(shù)據(jù)存入內(nèi)存,只能寫入磁盤,磁盤IO將會嚴重消耗性能;另一方面,task在創(chuàng)建對象的時候,也許會發(fā)現(xiàn)堆內(nèi)存無法存放新創(chuàng)建的對象,這就會導(dǎo)致頻繁的GC,GC會導(dǎo)致工作線程停止,進而導(dǎo)致Spark暫停工作一段時間,嚴重影響Spark性能。
假設(shè)當(dāng)前任務(wù)配置了20個Executor,指定500個task,有一個20M的變量被所有task共用,此時會在500個task中產(chǎn)生500個副本,耗費集群10G的內(nèi)存,如果使用了廣播變量, 那么每個Executor保存一個副本,一共消耗400M內(nèi)存,內(nèi)存消耗減少了5倍。
廣播變量在每個Executor保存一個副本,此Executor的所有task共用此廣播變量,這讓變量產(chǎn)生的副本數(shù)量大大減少。
在初始階段,廣播變量只在Driver中有一份副本。task在運行的時候,想要使用廣播變量中的數(shù)據(jù),此時首先會在自己本地的Executor對應(yīng)的BlockManager中嘗試獲取變量,如果本地沒有,BlockManager就會從Driver或者其他節(jié)點的BlockManager上遠程拉取變量的復(fù)本,并由本地的BlockManager進行管理;之后此Executor的所有task都會直接從本地的BlockManager中獲取變量。
對于多個Task可能會共用的數(shù)據(jù)可以廣播到每個Executor上:
val 廣播變量名= sc.broadcast(會被各個Task用到的變量,即需要廣播的變量)
廣播變量名.value//獲取廣播變量
11. 使用Kryo序列化
默認情況下,Spark使用Java的序列化機制。Java的序列化機制使用方便,不需要額外的配置,在算子中使用的變量實現(xiàn)Serializable接口即可,但是,Java序列化機制的效率不高,序列化速度慢并且序列化后的數(shù)據(jù)所占用的空間依然較大。
Spark官方宣稱Kryo序列化機制比Java序列化機制性能提高10倍左右,Spark之所以沒有默認使用Kryo作為序列化類庫,是因為它不支持所有對象的序列化,同時Kryo需要用戶在使用前注冊需要序列化的類型,不夠方便,但從Spark 2.0.0版本開始,簡單類型、簡單類型數(shù)組、字符串類型的Shuffling RDDs 已經(jīng)默認使用Kryo序列化方式了。
Kryo序列化注冊方式的代碼如下:
public class MyKryoRegistrator implements KryoRegistrator{
@Override
public void registerClasses(Kryo kryo){
kryo.register(StartupReportLogs.class);
}
}
配置Kryo序列化方式的代碼如下:
//創(chuàng)建SparkConf對象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化庫
conf.set("spark.serializer", "org.a(chǎn)pache.spark.serializer.KryoSerializer");
//在Kryo序列化庫中注冊自定義的類集合
conf.set("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator");

最新活動更多
推薦專題
- 1 UALink規(guī)范發(fā)布:挑戰(zhàn)英偉達AI統(tǒng)治的開始
- 2 北電數(shù)智主辦酒仙橋論壇,探索AI產(chǎn)業(yè)發(fā)展新路徑
- 3 “AI寒武紀”爆發(fā)至今,五類新物種登上歷史舞臺
- 4 降薪、加班、裁員三重暴擊,“AI四小龍”已折戟兩家
- 5 國產(chǎn)智駕迎戰(zhàn)特斯拉FSD,AI含量差幾何?
- 6 光計算迎來商業(yè)化突破,但落地仍需時間
- 7 東陽光:2024年扭虧、一季度凈利大增,液冷疊加具身智能打開成長空間
- 8 地平線自動駕駛方案解讀
- 9 封殺AI“照騙”,“淘寶們”終于不忍了?
- 10 優(yōu)必選:營收大增主靠小件,虧損繼續(xù)又逢關(guān)稅,能否乘機器人東風(fēng)翻身?