一、Spark SQL的概念理解
Spark SQL是spark套件中一個模板,它將數(shù)據(jù)的計算任務(wù)通過SQL的形式轉(zhuǎn)換成了RDD的計算,類似于Hive通過SQL的形式將數(shù)據(jù)的計算任務(wù)轉(zhuǎn)換成了MapReduce。
Spark SQL的特點:
和Spark Core的無縫集成,可以在寫整個RDD應(yīng)用的時候,配置Spark SQL來完成邏輯實現(xiàn)。
統(tǒng)一的數(shù)據(jù)訪問方式,Spark SQL提供標準化的SQL查詢。
Hive的繼承,Spark SQL通過內(nèi)嵌的hive或者連接外部已經(jīng)部署好的hive案例,實現(xiàn)了對hive語法的繼承和操作。
標準化的連接方式,Spark SQL可以通過啟動thrift Server來支持JDBC、ODBC的訪問,將自己作為一個BI Server使用
Spark SQL數(shù)據(jù)抽象:
RDD(Spark1.0)-》DataFrame(Spark1.3)-》DataSet(Spark1.6)
Spark SQL提供了DataFrame和DataSet的數(shù)據(jù)抽象
DataFrame就是RDD+Schema,可以認為是一張二維表格,劣勢在于編譯器不進行表格中的字段的類型檢查,在運行期進行檢查
DataSet是Spark最新的數(shù)據(jù)抽象,Spark的發(fā)展會逐步將DataSet作為主要的數(shù)據(jù)抽象,弱化RDD和DataFrame.DataSet包含了DataFrame所有的優(yōu)化機制。除此之外提供了以樣例類為Schema模型的強類型
DataFrame=DataSet[Row]
DataFrame和DataSet都有可控的內(nèi)存管理機制,所有數(shù)據(jù)都保存在非堆上,都使用了catalyst進行SQL的優(yōu)化。
Spark SQL客戶端查詢:
可以通過Spark-shell來操作Spark SQL,spark作為SparkSession的變量名,sc作為SparkContext的變量名
可以通過Spark提供的方法讀取json文件,將json文件轉(zhuǎn)換成DataFrame
可以通過DataFrame提供的API來操作DataFrame里面的數(shù)據(jù)。
可以通過將DataFrame注冊成為一個臨時表的方式,來通過Spark.sql方法運行標準的SQL語句來查詢。
二、Spark SQL查詢方式
DataFrame查詢方式
DataFrame支持兩種查詢方式:一種是DSL風(fēng)格,另外一種是SQL風(fēng)格
(1)、DSL風(fēng)格:
需要引入import spark.implicit. _ 這個隱式轉(zhuǎn)換,可以將DataFrame隱式轉(zhuǎn)換成RDD
(2)、SQL風(fēng)格:
a、需要將DataFrame注冊成一張表格,如果通過CreateTempView這種方式來創(chuàng)建,那么該表格Session有效,如果通過CreateGlobalTempView來創(chuàng)建,那么該表格跨Session有效,但是SQL語句訪問該表格的時候需要加上前綴global_temp
b、需要通過sparkSession.sql方法來運行你的SQL語句
DataSet查詢方式
定義一個DataSet,先定義一個Case類
三、DataFrame、Dataset和RDD互操作
RDD-》DataFrame
普通方式:例如rdd.map(para(para(0).trim(),para(1).trim().toInt)).toDF(“name”,“age”)
通過反射來設(shè)置schema,例如:
#通過反射設(shè)置schema,數(shù)據(jù)集是spark自帶的people.txt,路徑在下面的代碼中case class Person(name:String,age:Int)
val peopleDF=spark.sparkContext.textFile(“file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt”).map(_.split(“,”)).map(para=》Person(para(0).trim,para(1).trim.toInt)).toDF
peopleDF.show
#注冊成一張臨時表
peopleDF.createOrReplaceTempView(“persons”)
val teen=spark.sql(“select name,age from persons where age between 13 and 29”)
teen.show
這時teen是一張表,每一行是一個row對象,如果需要訪問Row對象中的每一個元素,可以通過下標 row(0);你也可以通過列名 row.getAs[String](“name”)
也可以使用getAs方法:
3、通過編程的方式來設(shè)置schema,適用于編譯器不能確定列的情況
val peopleRDD=spark.sparkContext.textFile(“file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt”)
val schemaString=“name age”
val filed=schemaString.split(“ ”).map(filename=》 org.apache.spark.sql.types.StructField(filename,org.apache.spark.sql.types.StringType,nullable = true))
val schema=org.apache.spark.sql.types.StructType(filed)
peopleRDD.map(_.split(“,”)).map(para=》org.apache.spark.sql.Row(para(0).trim,para(1).trim))
val peopleDF=spark.createDataFrame(res6,schema)
peopleDF.show
DataFrame-》RDD
dataFrame.rdd
RDD-》DataSet
rdd.map(para=》 Person(para(0).trim(),para(1).trim().toInt)).toDS
DataSet-》DataSet
dataSet.rdd
DataFrame -》 DataSet
dataFrame.to[Person]
DataSet -》 DataFrame
dataSet.toDF
四、用戶自定義函數(shù)
用戶自定義UDF函數(shù)
通過spark.udf功能用戶可以自定義函數(shù)
自定義udf函數(shù):
通過spark.udf.register(name,func)來注冊一個UDF函數(shù),name是UDF調(diào)用時的標識符,fun是一個函數(shù),用于處理字段。
需要將一個DF或者DS注冊為一個臨時表
通過spark.sql去運行一個SQL語句,在SQL語句中可以通過name(列名)方式來應(yīng)用UDF函數(shù)
用戶自定義聚合函數(shù)
1. 弱類型用戶自定義聚合函數(shù)
新建一個Class 繼承UserDefinedAggregateFunction ,然后復(fù)寫方法:
//聚合函數(shù)需要輸入參數(shù)的數(shù)據(jù)類型
override def inputSchema: StructType = ???
//可以理解為保存聚合函數(shù)業(yè)務(wù)邏輯數(shù)據(jù)的一個數(shù)據(jù)結(jié)構(gòu)
override def bufferSchema: StructType = ???
// 返回值的數(shù)據(jù)類型
override def dataType: DataType = ???
// 對于相同的輸入一直有相同的輸出
override def deterministic: Boolean = true
//用于初始化你的數(shù)據(jù)結(jié)構(gòu)
override def initialize(buffer: MutableAggregationBuffer): Unit = ???
//用于同分區(qū)內(nèi)Row對聚合函數(shù)的更新操作
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ???
//用于不同分區(qū)對聚合結(jié)果的聚合。
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ???
//計算最終結(jié)果
override def evaluate(buffer: Row): Any = ???
你需要通過spark.udf.resigter去注冊你的UDAF函數(shù)。
需要通過spark.sql去運行你的SQL語句,可以通過 select UDAF(列名) 來應(yīng)用你的用戶自定義聚合函數(shù)。
2、強類型用戶自定義聚合函數(shù)
新建一個class,繼承Aggregator[Employee, Average, Double],其中Employee是在應(yīng)用聚合函數(shù)的時候傳入的對象,Average是聚合函數(shù)在運行的時候內(nèi)部需要的數(shù)據(jù)結(jié)構(gòu),Double是聚合函數(shù)最終需要輸出的類型。這些可以根據(jù)自己的業(yè)務(wù)需求去調(diào)整。復(fù)寫相對應(yīng)的方法:
//用于定義一個聚合函數(shù)內(nèi)部需要的數(shù)據(jù)結(jié)構(gòu)
override def zero: Average = ???
//針對每個分區(qū)內(nèi)部每一個輸入來更新你的數(shù)據(jù)結(jié)構(gòu)
override def reduce(b: Average, a: Employee): Average = ???
//用于對于不同分區(qū)的結(jié)構(gòu)進行聚合
override def merge(b1: Average, b2: Average): Average = ???
//計算輸出
override def finish(reduction: Average): Double = ???
//用于數(shù)據(jù)結(jié)構(gòu)他的轉(zhuǎn)換
override def bufferEncoder: Encoder[Average] = ???
//用于最終結(jié)果的轉(zhuǎn)換
override def outputEncoder: Encoder[Double] = ???
新建一個UDAF實例,通過DF或者DS的DSL風(fēng)格語法去應(yīng)用。
五、Spark SQL和Hive的繼承
1、內(nèi)置Hive
Spark內(nèi)置有Hive,Spark2.1.1 內(nèi)置的Hive是1.2.1。
需要將core-site.xml和hdfs-site.xml 拷貝到spark的conf目錄下。如果Spark路徑下發(fā)現(xiàn)metastore_db,需要刪除【僅第一次啟動的時候】。
在你第一次啟動創(chuàng)建metastore的時候,你需要指定spark.sql.warehouse.dir這個參數(shù), 比如:bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://master01:9000/spark_warehouse
注意,如果你在load數(shù)據(jù)的時候,需要將數(shù)據(jù)放到HDFS上。
2、外部Hive(這里主要使用這個方法)
需要將hive-site.xml 拷貝到spark的conf目錄下。
如果hive的metestore使用的是mysql數(shù)據(jù)庫,那么需要將mysql的jdbc驅(qū)動包放到spark的jars目錄下。
可以通過spark-sql或者spark-shell來進行sql的查詢。完成和hive的連接。
這就是hive里面的表
六、Spark SQL的數(shù)據(jù)源
1、輸入
對于Spark SQL的輸入需要使用sparkSession.read方法
通用模式 sparkSession.read.format(“json”).load(“path”) 支持類型:parquet、json、text、csv、orc、jdbc
專業(yè)模式 sparkSession.read.json、 csv 直接指定類型。
2、輸出
對于Spark SQL的輸出需要使用 sparkSession.write方法
通用模式 dataFrame.write.format(“json”).save(“path”) 支持類型:parquet、json、text、csv、orc
專業(yè)模式 dataFrame.write.csv(“path”) 直接指定類型
如果你使用通用模式,spark默認parquet是默認格式、sparkSession.read.load 加載的默認是parquet格式dataFrame.write.save也是默認保存成parquet格式。
如果需要保存成一個text文件,那么需要dataFrame里面只有一列(只需要一列即可)。
七、Spark SQL實戰(zhàn)
1、數(shù)據(jù)說明
這里有三個數(shù)據(jù)集,合起來大概有幾十萬條數(shù)據(jù),是關(guān)于貨品交易的數(shù)據(jù)集。
2、任務(wù)
這里有三個需求:
計算所有訂單中每年的銷售單數(shù)、銷售總額
計算所有訂單每年最大金額訂單的銷售額
計算所有訂單中每年最暢銷貨品
3、步驟
1. 加載數(shù)據(jù)
tbStock.txt
#代碼case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable
val tbStockRdd=spark.sparkContext.textFile(“file:///root/dataset/tbStock.txt”)
val tbStockDS=tbStockRdd.map(_.split(“,”)).map(attr=》tbStock(attr(0),attr(1),attr(2))).toDS
tbStockDS.show()
tbStockDetail.txt
case class tbStockDetail(ordernumber:String,rownum:Int,itemid:String,number:Int,price:Double,amount:Double) extends Serializable
val tbStockDetailRdd=spark.sparkContext.textFile(“file:///root/dataset/tbStockDetail.txt”)
val tbStockDetailDS=tbStockDetailRdd.map(_.split(“,”)).map(attr=》tbStockDetail(attr(0),attr(1).trim().toInt,attr(2),attr(3).trim().toInt,attr(4).trim().toDouble,attr(5).trim().toDouble)).toDS
tbStockDetailDS.show()
tbDate.txt
case class tbDate(dateid:String,years:Int,theyear:Int,month:Int,day:Int,weekday:Int,week:Int,quarter:Int,period:Int,halfmonth:Int) extends Serializable
val tbDateRdd=spark.sparkContext.textFile(“file:///root/dataset/tbDate.txt”)
val tbDateDS=tbDateRdd.map(_.split(“,”)).map(attr=》tbDate(attr(0),attr(1).trim().toInt,attr(2).trim().toInt,attr(3).trim().toInt,attr(4).trim().toInt,attr(5).trim().toInt,attr(6).trim().toInt,attr(7).trim().toInt,attr(8).trim().toInt,attr(9).trim().toInt)).toDS
tbDateDS.show()
2. 注冊表
tbStockDS.createOrReplaceTempView(“tbStock”)
tbDateDS.createOrReplaceTempView(“tbDate”)
tbStockDetailDS.createOrReplaceTempView(“tbStockDetail”)
3. 解析表
計算所有訂單中每年的銷售單數(shù)、銷售總額
#sql語句
select c.theyear,count(distinct a.ordernumber),sum(b.amount)
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear
order by c.theyear
計算所有訂單每年最大金額訂單的銷售額
a、先統(tǒng)計每年每個訂單的銷售額
select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
group by a.dateid,a.ordernumber
b、計算最大金額訂單的銷售額
select d.theyear,c.SumOfAmount as SumOfAmount
from
(select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
group by a.dateid,a.ordernumber) c
join tbDate d on c.dateid=d.dateid
group by d.theyear
order by theyear desc
計算所有訂單中每年最暢銷貨品
a、求出每年每個貨品的銷售額
select c.theyear,b.itemid,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid
b、在a的基礎(chǔ)上,統(tǒng)計每年單個貨品的最大金額
select d.theyear,max(d.SumOfAmount) as MaxOfAmount
from
(select c.theyear,b.itemid,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid) d
group by theyear
c、用最大銷售額和統(tǒng)計好的每個貨品的銷售額join,以及用年join,集合得到最暢銷貨品那一行信息
select distinct e.theyear,e.itemid,f.maxofamount
from
(select c.theyear,b.itemid,sum(b.amount) as sumofamount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid) e
join
(select d.theyear,max(d.sumofamount) as maxofamount
from
(select c.theyear,b.itemid,sum(b.amount) as sumofamount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid) d
group by d.theyear) f on e.theyear=f.theyear
and e.sumofamount=f.maxofamount order by e.theyear
編輯:jq
-
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
7067瀏覽量
89108 -
SQL
+關(guān)注
關(guān)注
1文章
766瀏覽量
44161 -
函數(shù)
+關(guān)注
關(guān)注
3文章
4333瀏覽量
62687 -
RDD
+關(guān)注
關(guān)注
0文章
7瀏覽量
7983
原文標題:Spark SQL 重點知識總結(jié)
文章出處:【微信號:DBDevs,微信公眾號:數(shù)據(jù)分析與開發(fā)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論