訂閱
糾錯
加入自媒體

Spark調優(yōu)之RDD算子調優(yōu)(面試常問,建議收藏)

2021-03-13 08:59
園陌
關注

Spark調優(yōu)之RDD算子調優(yōu)

不廢話,直接進入正題!

1. RDD復用

在對RDD進行算子時,要避免相同的算子和計算邏輯之下對RDD進行重復的計算,如下圖所示:

RDD的重復計算

對上圖中的RDD計算架構進行修改,得到如下圖所示的優(yōu)化結果:

RDD架構優(yōu)化

2. 盡早filter

獲取到初始RDD后,應該考慮盡早地過濾掉不需要的數據,進而減少對內存的占用,從而提升Spark作業(yè)的運行效率。

3. 讀取大量小文件-用wholeTextFiles

當我們將一個文本文件讀取為 RDD 時,輸入的每一行都會成為RDD的一個元素。

也可以將多個完整的文本文件一次性讀取為一個pairRDD,其中鍵是文件名,值是文件內容。

val input:RDD[String] = sc.textFile("dir.log")

如果傳遞目錄,則將目錄下的所有文件讀取作為RDD。文件路徑支持通配符。

但是這樣對于大量的小文件讀取效率并不高,應該使用 wholeTextFiles
返回值為RDD[(String, String)],其中Key是文件的名稱,Value是文件的內容。

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])

wholeTextFiles讀取小文件:

val filesRDD: RDD[(String, String)] =
sc.wholeTextFiles("D:\data\files", minPartitions = 3)
val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\r\n"))
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)

4. mapPartition和foreachPartition

mapPartitions

map(_….)  表示每一個元素

mapPartitions(_….)  表示每個分區(qū)的數據組成的迭代器

普通的map算子對RDD中的每一個元素進行操作,而mapPartitions算子對RDD中每一個分區(qū)進行操作。

如果是普通的map算子,假設一個partition有1萬條數據,那么map算子中的function要執(zhí)行1萬次,也就是對每個元素進行操作。

map 算子

如果是mapPartition算子,由于一個task處理一個RDD的partition,那么一個task只會執(zhí)行一次function,function一次接收所有的partition數據,效率比較高。

mapPartition 算子

比如,當要把RDD中的所有數據通過JDBC寫入數據,如果使用map算子,那么需要對RDD中的每一個元素都創(chuàng)建一個數據庫連接,這樣對資源的消耗很大,如果使用mapPartitions算子,那么針對一個分區(qū)的數據,只需要建立一個數據庫連接。

mapPartitions算子也存在一些缺點:對于普通的map操作,一次處理一條數據,如果在處理了2000條數據后內存不足,那么可以將已經處理完的2000條數據從內存中垃圾回收掉;但是如果使用mapPartitions算子,但數據量非常大時,function一次處理一個分區(qū)的數據,如果一旦內存不足,此時無法回收內存,就可能會OOM,即內存溢出。

因此,mapPartitions算子適用于數據量不是特別大的時候,此時使用mapPartitions算子對性能的提升效果還是不錯的。(當數據量很大的時候,一旦使用mapPartitions算子,就會直接OOM)

在項目中,應該首先估算一下RDD的數據量、每個partition的數據量,以及分配給每個Executor的內存資源,如果資源允許,可以考慮使用mapPartitions算子代替map。

foreachPartition

rrd.foreache(_….) 表示每一個元素

rrd.forPartitions(_….)  表示每個分區(qū)的數據組成的迭代器

在生產環(huán)境中,通常使用foreachPartition算子來完成數據庫的寫入,通過foreachPartition算子的特性,可以優(yōu)化寫數據庫的性能。

如果使用foreach算子完成數據庫的操作,由于foreach算子是遍歷RDD的每條數據,因此,每條數據都會建立一個數據庫連接,這是對資源的極大浪費,因此,對于寫數據庫操作,我們應當使用foreachPartition算子。

與mapPartitions算子非常相似,foreachPartition是將RDD的每個分區(qū)作為遍歷對象,一次處理一個分區(qū)的數據,也就是說,如果涉及數據庫的相關操作,一個分區(qū)的數據只需要創(chuàng)建一次數據庫連接,如下圖所示:

foreachPartition 算子

使用了foreachPartition 算子后,可以獲得以下的性能提升:

對于我們寫的function函數,一次處理一整個分區(qū)的數據;

對于一個分區(qū)內的數據,創(chuàng)建唯一的數據庫連接;

只需要向數據庫發(fā)送一次SQL語句和多組參數;

在生產環(huán)境中,全部都會使用foreachPartition算子完成數據庫操作。foreachPartition算子存在一個問題,與mapPartitions算子類似,如果一個分區(qū)的數據量特別大,可能會造成OOM,即內存溢出。

1  2  下一頁>  
聲明: 本文由入駐維科號的作者撰寫,觀點僅代表作者本人,不代表OFweek立場。如有侵權或其他問題,請聯系舉報。

發(fā)表評論

0條評論,0人參與

請輸入評論內容...

請輸入評論/評論長度6~500個字

您提交的評論過于頻繁,請輸入驗證碼繼續(xù)

暫無評論

暫無評論

    掃碼關注公眾號
    OFweek人工智能網
    獲取更多精彩內容
    文章糾錯
    x
    *文字標題:
    *糾錯內容:
    聯系郵箱:
    *驗 證 碼:

    粵公網安備 44030502002758號