基于Spark的數(shù)據(jù)分析實(shí)踐
三、SparkSQL
Spark 從 1.3 版本開(kāi)始原有 SchemaRDD 的基礎(chǔ)上提供了類似Pandas DataFrame API。新的DataFrame API不僅可以大幅度降低普通開(kāi)發(fā)者的學(xué)習(xí)門檻,同時(shí)還支持Scala、Java與Python三種語(yǔ)言。更重要的是,由于脫胎自SchemaRDD,DataFrame天然適用于分布式大數(shù)據(jù)場(chǎng)景。
一般的數(shù)據(jù)處理步驟:讀入數(shù)據(jù) -> 對(duì)數(shù)據(jù)進(jìn)行處理 -> 分析結(jié)果 -> 寫入結(jié)果
SparkSQL 結(jié)構(gòu)化數(shù)據(jù)
處理結(jié)構(gòu)化數(shù)據(jù)(如 CSV,JSON,Parquet 等);
把已經(jīng)結(jié)構(gòu)化數(shù)據(jù)抽象成 DataFrame (HiveTable);
非結(jié)構(gòu)化數(shù)據(jù)通過(guò) RDD.map.filter 轉(zhuǎn)換成結(jié)構(gòu)化進(jìn)行處理;
按照列式數(shù)據(jù)庫(kù),只加載非結(jié)構(gòu)化中可結(jié)構(gòu)化的部分列(Hbase,MongoDB);
處理非結(jié)構(gòu)化數(shù)據(jù),不能簡(jiǎn)單的用 DataFrame 裝載。而是要用 SparkRDD 把數(shù)據(jù)讀入,在通過(guò)一系列的 Transformer Method 把非結(jié)構(gòu)化的數(shù)據(jù)加工為結(jié)構(gòu)化,或者過(guò)濾到不合法的數(shù)據(jù)。
SparkSQL DataFrame
SparkSQL 中一切都是 DataFrame,all in DataFrame. DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,類似于傳統(tǒng)數(shù)據(jù)庫(kù)中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。如果熟悉 Python Pandas 庫(kù)中的 DataFrame 結(jié)構(gòu),則會(huì)對(duì) SparkSQL DataFrame 概念非常熟悉。
TextFile DataFrame
import.org.a(chǎn)pache.spark.sql._//定義數(shù)據(jù)的列名稱和類型valdt=StructType(List(id:String,name:String,gender:String,age:Int))
//導(dǎo)入user_info.csv文件并指定分隔符vallines = sc.textFile("/path/user_info.csv").map(_.split(","))
//將表結(jié)構(gòu)和數(shù)據(jù)關(guān)聯(lián)起來(lái),把讀入的數(shù)據(jù)user.csv映射成行,構(gòu)成數(shù)據(jù)集valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt))
//通過(guò)SparkSession.createDataFrame()創(chuàng)建表,并且數(shù)據(jù)表表頭val df= spark.createDataFrame(rowRDD, dt)
可左右滑動(dòng)查看代碼
讀取規(guī)則數(shù)據(jù)文件作為DataFrame
SparkSession.Builder builder = SparkSession.builder()Builder.setMaster("local").setAppName("TestSparkSQLApp")SparkSession spark = builder.getOrCreate();SQLContext sqlContext = spark.sqlContext();
# 讀取 JSON 數(shù)據(jù),path 可為文件或者目錄valdf=sqlContext.read().json(path);
# 讀取 HadoopParquet 文件vardf=sqlContext.read().parquet(path);
# 讀取 HadoopORC 文件vardf=sqlContext.read().orc(path);
可左右滑動(dòng)查看代碼
JSON 文件為每行一個(gè) JSON 對(duì)象的文件類型,行尾無(wú)須逗號(hào)。文件頭也無(wú)須[]指定為數(shù)組;SparkSQL 讀取是只是按照每行一條 JSON Record序列化;
Parquet文件
Configurationconfig = new Configuration();ParquetFileReaderreader = ParquetFileReader.open( HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf));Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData();String allFields= schema.get("org.a(chǎn)pache.spark.sql.parquet.row.metadata");
可左右滑動(dòng)查看代碼
allFiedls 的值就是各字段的名稱和具體的類型,整體是一個(gè)json格式進(jìn)行展示。
讀取 Hive 表作為 DataFrame
Spark2 API 推薦通過(guò) SparkSession.Builder 的 Builder 模式創(chuàng)建 SparkContext。 Builder.getOrCreate() 用于創(chuàng)建 SparkSession,SparkSession 是 SparkContext 的封裝。
在Spark1.6中有兩個(gè)核心組件SQLcontext和HiveContext。SQLContext 用于處理在 SparkSQL 中動(dòng)態(tài)注冊(cè)的表,HiveContext 用于處理 Hive 中的表。
從Spark2.0以上的版本開(kāi)始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可執(zhí)行 Hive 中的表,也可執(zhí)行內(nèi)部注冊(cè)的表;
在需要執(zhí)行 Hive 表時(shí),只需要在 SparkSession.Builder 中開(kāi)啟 Hive 支持即可(enableHiveSupport())。
SparkSession.Builder builder = SparkSession.builder().enableHiveSupport();SparkSession spark = builder.getOrCreate();SQLContext sqlContext = spark.sqlContext();
可左右滑動(dòng)查看代碼
// db 指 Hive 庫(kù)中的數(shù)據(jù)庫(kù)名,如果不寫默認(rèn)為 default
// tableName 指 hive 庫(kù)的數(shù)據(jù)表名
sqlContext.sql(“select * from db.tableName”)
可左右滑動(dòng)查看代碼
SparkSQL ThriftServer
//首先打開(kāi) Hive 的 Metastore服務(wù)
hive$bin/hive –-service metastore –p 8093
可左右滑動(dòng)查看代碼
//把 Spark 的相關(guān) jar 上傳到hadoophdfs指定目錄,用于指定sparkonyarn的依賴 jar
spark$hadoop fs –put jars/*.jar /lib/spark2
可左右滑動(dòng)查看代碼
// 啟動(dòng) spark thriftserver 服務(wù)
spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf spark.yarn.jars=hdfs:///lib/spark2/*.jar
可左右滑動(dòng)查看代碼
當(dāng)hdfs 上傳了spark 依賴 jar 時(shí),通過(guò)spark.yarn.jars 可看到日志 spark 無(wú)須每個(gè)job 都上傳jar,可節(jié)省啟動(dòng)時(shí)間
19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar
可左右滑動(dòng)查看代碼
//通過(guò) spark bin 下的 beeline 工具,可以連接到 spark ThriftServer(SparkOnHive)
bin/beeline -u jdbc:hive2://ip:10000/default -n hadoop
可左右滑動(dòng)查看代碼
-u 是指定 beeline 的執(zhí)行驅(qū)動(dòng)地址;
-n 是指定登陸到 spark Session 上的用戶名稱;
Beeline 還支持傳入-e 可傳入一行 SQL,
-e <query> query that should be executed
也可通過(guò) –f 指定一個(gè) SQL File,內(nèi)部可用逗號(hào)分隔的多個(gè) SQL(存儲(chǔ)過(guò)程)
-f <exec file> script file that should be executed
SparkSQL Beeline 的執(zhí)行效果展示
SparkSQL ThriftServer
對(duì)于 SparkSQL ThriftServer 服務(wù),每個(gè)登陸的用戶都有創(chuàng)建的 SparkSession,并且執(zhí)行的對(duì)個(gè) SQL 會(huì)通過(guò)時(shí)間順序列表展示。
SparkSQL ThriftServer 服務(wù)可用于其他支持的數(shù)據(jù)庫(kù)工具創(chuàng)建查詢,也用于第三方的 BI 工具,如 tableau。
四、SparkSQL Flow
SparkSQL Flow 是以 SparkSQL 為基礎(chǔ),開(kāi)發(fā)的統(tǒng)一的基于 XML 配置化的可執(zhí)行一連串的 SQL 操作,這一連串的 SQL 操作定義為一個(gè) Flow。下文開(kāi)始 SparkSQL Flow 的介紹:
SparkSQL Flow 是基于 SparkSQL 開(kāi)發(fā)的一種基于 XML 配置化的 SQL 數(shù)據(jù)流轉(zhuǎn)處理模型。該模型簡(jiǎn)化了 SparkSQL 、Spark RDD的開(kāi)發(fā),并且降低開(kāi)發(fā)了難度,適合了解數(shù)據(jù)業(yè)務(wù)但無(wú)法駕馭大數(shù)據(jù)以及 Spark 技術(shù)的開(kāi)發(fā)者。
一個(gè)由普元技術(shù)部提供的基于 SparkSQL 的開(kāi)發(fā)模型;
一個(gè)可二次定制開(kāi)發(fā)的大數(shù)據(jù)開(kāi)發(fā)框架,提供了靈活的可擴(kuò)展 API;
一個(gè)提供了 對(duì)文件,數(shù)據(jù)庫(kù),NoSQL 等統(tǒng)一的數(shù)據(jù)開(kāi)發(fā)視界語(yǔ)義;
基于 SQL 的開(kāi)發(fā)語(yǔ)言和 XML 的模板配置,支持 Spark UDF 的擴(kuò)展管理;
支持基于 Spark Standlone,Yarn,Mesos 資源管理平臺(tái);
支持開(kāi)源、華為、星環(huán)等平臺(tái)統(tǒng)一認(rèn)證。
SparkSQL Flow 適合的場(chǎng)景:
批量 ETL;
非實(shí)時(shí)分析服務(wù);
SparkSQL Flow XML 概覽
Properties 內(nèi)定義一組變量,可用于宏替換;
Methods 內(nèi)可注冊(cè) udf 和 udaf 兩種函數(shù);
Prepare 內(nèi)可定義前置 SQL,用于執(zhí)行 source 前的 sql 操作;
Sources 內(nèi)定義一個(gè)到多個(gè)數(shù)據(jù)表視圖;
Transformer 內(nèi)可定義 0 到多個(gè)基于 SQL 的數(shù)據(jù)轉(zhuǎn)換操作(支持 join);
Targets 用于定義 1 到多個(gè)數(shù)據(jù)輸出;
After 可定義 0到多個(gè)任務(wù)日志;
如你所見(jiàn),source 的 type 參數(shù)用于區(qū)分 source 的類型,source 支持的種類直接決定SparkSQL Flow 的數(shù)據(jù)源加載廣度;并且,根據(jù) type 不同,source 也需要配置不同的參數(shù),如數(shù)據(jù)庫(kù)還需要 driver,url,user和 password 參數(shù)。
Transformer 是基于 source 定的數(shù)據(jù)視圖可執(zhí)行的一組轉(zhuǎn)換 SQL,該 SQL 符合 SparkSQL 的語(yǔ)法(SQL99)。Transform 的 SQL 的執(zhí)行結(jié)果被作為中間表命名為 table_name 指定的值。
Targets 為定義輸出,table_name 的值需在 source 或者 Transformer 中定義。

發(fā)表評(píng)論
請(qǐng)輸入評(píng)論內(nèi)容...
請(qǐng)輸入評(píng)論/評(píng)論長(zhǎng)度6~500個(gè)字
最新活動(dòng)更多
-
3月27日立即報(bào)名>> 【工程師系列】汽車電子技術(shù)在線大會(huì)
-
4月30日立即下載>> 【村田汽車】汽車E/E架構(gòu)革新中,新智能座艙挑戰(zhàn)的解決方案
-
5月15-17日立即預(yù)約>> 【線下巡回】2025年STM32峰會(huì)
-
即日-5.15立即報(bào)名>>> 【在線會(huì)議】安森美Hyperlux™ ID系列引領(lǐng)iToF技術(shù)革新
-
5月15日立即下載>> 【白皮書】精確和高效地表征3000V/20A功率器件應(yīng)用指南
-
5月16日立即參評(píng) >> 【評(píng)選啟動(dòng)】維科杯·OFweek 2025(第十屆)人工智能行業(yè)年度評(píng)選
推薦專題
- 1 UALink規(guī)范發(fā)布:挑戰(zhàn)英偉達(dá)AI統(tǒng)治的開(kāi)始
- 2 北電數(shù)智主辦酒仙橋論壇,探索AI產(chǎn)業(yè)發(fā)展新路徑
- 3 “AI寒武紀(jì)”爆發(fā)至今,五類新物種登上歷史舞臺(tái)
- 4 降薪、加班、裁員三重暴擊,“AI四小龍”已折戟兩家
- 5 國(guó)產(chǎn)智駕迎戰(zhàn)特斯拉FSD,AI含量差幾何?
- 6 光計(jì)算迎來(lái)商業(yè)化突破,但落地仍需時(shí)間
- 7 東陽(yáng)光:2024年扭虧、一季度凈利大增,液冷疊加具身智能打開(kāi)成長(zhǎng)空間
- 8 優(yōu)必選:營(yíng)收大增主靠小件,虧損繼續(xù)又逢關(guān)稅,能否乘機(jī)器人東風(fēng)翻身?
- 9 封殺AI“照騙”,“淘寶們”終于不忍了?
- 10 地平線自動(dòng)駕駛方案解讀