Expand 算子的實現(xiàn)
Expand 算子在 Spark SQL 源碼中的實現(xiàn)為 ExpandExec
類(Spark SQL 中的算子實現(xiàn)類的命名都是 XxxExec
的格式,其中 Xxx
為具體的算子名,比如 Project 算子的實現(xiàn)類為 ProjectExec
),核心代碼如下:
/**
* Apply all of the GroupExpressions to every input row, hence we will get
* multiple output rows for an input row.
* @param projections The group of expressions, all of the group expressions should
* output the same schema specified bye the parameter `output`
* @param output The output Schema
* @param child Child operator
*/
case class ExpandExec(
projections: Seq[Seq[Expression]],
output: Seq[Attribute],
child: SparkPlan)
extends UnaryExecNode with CodegenSupport {
...
// 關(guān)鍵點1,將child.output,也即上游算子輸出數(shù)據(jù)的schema,
// 綁定到表達(dá)式數(shù)組exprs,以此來計算輸出數(shù)據(jù)
private[this] val projection =
(exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
// doExecute()方法為Expand算子執(zhí)行邏輯所在
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
// 處理上游算子的輸出數(shù)據(jù),Expand算子的輸入數(shù)據(jù)就從iter迭代器獲取
child.execute().mapPartitions { iter =>
// 關(guān)鍵點2,projections對應(yīng)了Grouping Sets里面每個grouping set的表達(dá)式,
// 表達(dá)式輸出數(shù)據(jù)的schema為this.output, 比如 (quantity, city, car_model, spark_grouping_id)
// 這里的邏輯是為它們各自生成一個UnsafeProjection對象,通過該對象的apply方法就能得出Expand算子的輸出數(shù)據(jù)
val groups = projections.map(projection).toArray
new Iterator[InternalRow] {
private[this] var result: InternalRow = _
private[this] var idx = -1 // -1 means the initial state
private[this] var input: InternalRow = _
override final def hasNext: Boolean = (-1 < idx && idx < groups.length) || iter.hasNext
override final def next(): InternalRow = {
// 關(guān)鍵點3,對于輸入數(shù)據(jù)的每一條記錄,都重復(fù)使用N次,其中N的大小對應(yīng)了projections數(shù)組的大小,
// 也即Grouping Sets里指定的grouping set的數(shù)量
if (idx <= 0) {
// in the initial (-1) or beginning(0) of a new input row, fetch the next input tuple
input = iter.next()
idx = 0
}
// 關(guān)鍵點4,對輸入數(shù)據(jù)的每一條記錄,通過UnsafeProjection計算得出輸出數(shù)據(jù),
// 每個grouping set對應(yīng)的UnsafeProjection都會對同一個input計算一遍
result = groups(idx)(input)
idx += 1
if (idx == groups.length && iter.hasNext) {
idx = 0
}
numOutputRows += 1
result
}
}
}
}
...
}
ExpandExec
的實現(xiàn)并不復(fù)雜,想要理解它的運作原理,關(guān)鍵是看懂上述源碼中提到的 4 個關(guān)鍵點。
關(guān)鍵點 1
和 關(guān)鍵點 2
是基礎(chǔ),關(guān)鍵點 2
中的 groups
是一個 UnsafeProjection[N]
數(shù)組類型,其中每個 UnsafeProjection
代表了 Grouping Sets
語句里指定的 grouping set,它的定義是這樣的:
// A projection that returns UnsafeRow.
abstract class UnsafeProjection extends Projection {
override def apply(row: InternalRow): UnsafeRow
}
// The factory object for `UnsafeProjection`.
object UnsafeProjection
extends CodeGeneratorWithInterpretedFallback[Seq[Expression], UnsafeProjection] {
// Returns an UnsafeProjection for given sequence of Expressions, which will be bound to
// `inputSchema`.
def create(exprs: Seq[Expression], inputSchema: Seq[Attribute]): UnsafeProjection = {
create(bindReferences(exprs, inputSchema))
}
...
}
UnsafeProjection
起來了類似列投影的作用,其中, apply
方法根據(jù)創(chuàng)建時的傳參 exprs
和 inputSchema
,對輸入記錄進(jìn)行列投影,得出輸出記錄。
比如,前面的 GROUPING SETS ((city, car_model), (city), (car_model), ())
例子,它對應(yīng)的 groups
是這樣的:
其中,AttributeReference
類型的表達(dá)式,在計算時,會直接引用輸入數(shù)據(jù)對應(yīng)列的值;Iteral
類型的表達(dá)式,在計算時,值是固定的。
關(guān)鍵點 3
和 關(guān)鍵點 4
是 Expand 算子的精華所在,ExpandExec
通過這兩段邏輯,將每一個輸入記錄, 擴展(Expand) 成 N 條輸出記錄。
關(guān)鍵點 4
中groups(idx)(input)
等同于groups(idx).apply(input)
。
還是以前面 GROUPING SETS ((city, car_model), (city), (car_model), ())
為例子,效果是這樣的:
到這里,我們已經(jīng)弄清楚 Expand 算子的工作原理,再回頭看前面提到的 3 個問題,也不難回答了:
- Expand 的實現(xiàn)邏輯是怎樣的,為什么能達(dá)到
Union All
的效果?
如果說Union All
是先聚合再聯(lián)合,那么 Expand 就是先聯(lián)合再聚合。Expand 利用groups
里的 N 個表達(dá)式對每條輸入記錄進(jìn)行計算,擴展成 N 條輸出記錄。后面再聚合時,就能達(dá)到與Union All
一樣的效果了。 - Expand 節(jié)點的輸出數(shù)據(jù)是怎樣的 ?
在 schema 上,Expand 輸出數(shù)據(jù)會比輸入數(shù)據(jù)多出 spark_grouping_id
列;在記錄數(shù)上,是輸入數(shù)據(jù)記錄數(shù)的 N 倍。
spark_grouping_id
列的作用是什么 ?
spark_grouping_id
給每個 grouping set 進(jìn)行編號,這樣,即使在 Expand 階段把數(shù)據(jù)先聯(lián)合起來,在 Aggregate 階段(把 spark_grouping_id
加入到分組規(guī)則)也能保證數(shù)據(jù)能夠按照每個 grouping set 分別聚合,確保了結(jié)果的正確性。
查詢性能對比
從前文可知,Grouping Sets 和 Union All 兩個版本的 SQL 語句有著一樣的效果,但是它們的執(zhí)行計劃卻有著巨大的差別。下面,我們將比對兩個版本之間的執(zhí)行性能差異。
spark-sql 執(zhí)行完 SQL 語句之后會打印耗時信息,我們對兩個版本的 SQL 分別執(zhí)行 10 次,得到如下信息:
// Grouping Sets 版本執(zhí)行10次的耗時信息
// SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY GROUPING SETS ((city, car_model), (city), (car_model), ()) ORDER BY city, car_model;
Time taken: 0.289 seconds, Fetched 15 row(s)
Time taken: 0.251 seconds, Fetched 15 row(s)
Time taken: 0.259 seconds, Fetched 15 row(s)
Time taken: 0.258 seconds, Fetched 15 row(s)
Time taken: 0.296 seconds, Fetched 15 row(s)
Time taken: 0.247 seconds, Fetched 15 row(s)
Time taken: 0.298 seconds, Fetched 15 row(s)
Time taken: 0.286 seconds, Fetched 15 row(s)
Time taken: 0.292 seconds, Fetched 15 row(s)
Time taken: 0.282 seconds, Fetched 15 row(s)
// Union All 版本執(zhí)行10次的耗時信息
// (SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY city, car_model) UNION ALL (SELECT city, NULL as car_model, sum(quantity) AS sum FROM dealer GROUP BY city) UNION ALL (SELECT NULL as city, car_model, sum(quantity) AS sum FROM dealer GROUP BY car_model) UNION ALL (SELECT NULL as city, NULL as car_model, sum(quantity) AS sum FROM dealer) ORDER BY city, car_model;
Time taken: 0.628 seconds, Fetched 15 row(s)
Time taken: 0.594 seconds, Fetched 15 row(s)
Time taken: 0.591 seconds, Fetched 15 row(s)
Time taken: 0.607 seconds, Fetched 15 row(s)
Time taken: 0.616 seconds, Fetched 15 row(s)
Time taken: 0.64 seconds, Fetched 15 row(s)
Time taken: 0.623 seconds, Fetched 15 row(s)
Time taken: 0.625 seconds, Fetched 15 row(s)
Time taken: 0.62 seconds, Fetched 15 row(s)
Time taken: 0.62 seconds, Fetched 15 row(s)
可以算出,Grouping Sets 版本的 SQL 平均耗時為 0.276s ;Union All 版本的 SQL 平均耗時為 0.616s ,是前者的 2.2 倍 !
所以, Grouping Sets 版本的 SQL 不僅在表達(dá)上更加簡潔,在性能上也更加高效 。
RollUp 和 Cube
Group By
的高級用法中,還有 RollUp
和 Cube
兩個比較常用。
首先,我們看下 RollUp
語句 。
Spark SQL 官方文檔中 SQL Syntax 一節(jié)對 RollUp
語句的描述如下:
Specifies multiple levels of aggregations in a single statement. This clause is used to compute aggregations based on multiple grouping sets.
ROLLUP
is a shorthand forGROUPING SETS
. (... 一些例子)
官方文檔中,把 RollUp
描述為 Grouping Sets
的簡寫,等價規(guī)則為:RollUp(A, B, C) == Grouping Sets((A, B, C), (A, B), (A), ())
。
比如,Group By RollUp(city, car_model)
就等同于 Group By Grouping Sets((city, car_model), (city), ())
。
下面,我們通過 expand extended
看下 RollUp 版本 SQL 的 Optimized Logical Plan:
spark-sql> explain extended SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY ROLLUP(city, car_model) ORDER BY city, car_model;
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
Sort [city#2164 ASC NULLS FIRST, car_model#2165 ASC NULLS FIRST], true
+- Aggregate [city#2164, car_model#2165, spark_grouping_id#2163L], [city#2164, car_model#2165, sum(quantity#2159) AS sum#2150L]
+- Expand [[quantity#2159, city#2157, car_model#2158, 0], [quantity#2159, city#2157, null, 1], [quantity#2159, null, null, 3]], [quantity#2159, city#2164, car_model#2165, spark_grouping_id#2163L]
+- Project [quantity#2159, city#2157, car_model#2158]
+- HiveTableRelation [`default`.`dealer`, ..., Data Cols: [id#2156, city#2157, car_model#2158, quantity#2159], Partition Cols: []]
== Physical Plan ==
...
從上述 Plan 可以看出,RollUp
底層實現(xiàn)用的也是 Expand 算子,說明 RollUp
確實是基于 Grouping Sets
實現(xiàn)的。 而且 Expand [[quantity#2159, city#2157, car_model#2158, 0], [quantity#2159, city#2157, null, 1], [quantity#2159, null, null, 3]]
也表明 RollUp
符合等價規(guī)則。
下面,我們按照同樣的思路,看下 Cube
語句 。
Spark SQL 官方文檔中 SQL Syntax 一節(jié)對 Cube
語句的描述如下:
CUBE
clause is used to perform aggregations based on combination of grouping columns specified in theGROUP BY
clause.CUBE
is a shorthand forGROUPING SETS
. (... 一些例子)
同樣,官方文檔把 Cube
描述為 Grouping Sets
的簡寫,等價規(guī)則為:Cube(A, B, C) == Grouping Sets((A, B, C), (A, B), (A, C), (B, C), (A), (B), (C), ())
。
比如,Group By Cube(city, car_model)
就等同于 Group By Grouping Sets((city, car_model), (city), (car_model), ())
。
下面,我們通過 expand extended
看下 Cube 版本 SQL 的 Optimized Logical Plan:
spark-sql> explain extended SELECT city, car_model, sum(quantity) AS sum FROM dealer GROUP BY CUBE(city, car_model) ORDER BY city, car_model;
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
Sort [city#2202 ASC NULLS FIRST, car_model#2203 ASC NULLS FIRST], true
+- Aggregate [city#2202, car_model#2203, spark_grouping_id#2201L], [city#2202, car_model#2203, sum(quantity#2197) AS sum#2188L]
+- Expand [[quantity#2197, city#2195, car_model#2196, 0], [quantity#2197, city#2195, null, 1], [quantity#2197, null, car_model#2196, 2], [quantity#2197, null, null, 3]], [quantity#2197, city#2202, car_model#2203, spark_grouping_id#2201L]
+- Project [quantity#2197, city#2195, car_model#2196]
+- HiveTableRelation [`default`.`dealer`, ..., Data Cols: [id#2194, city#2195, car_model#2196, quantity#2197], Partition Cols: []]
== Physical Plan ==
...
從上述 Plan 可以看出,Cube
底層用的也是 Expand 算子,說明 Cube
確實基于 Grouping Sets
實現(xiàn),而且也符合等價規(guī)則。
所以,RollUp
和 Cube
可以看成是 Grouping Sets
的語法糖,在底層實現(xiàn)和性能上是一樣的。
最后
本文重點討論了 Group By
高級用法 Groupings Sets
語句的功能和底層實現(xiàn)。
雖然 Groupings Sets
的功能,通過 Union All
也能實現(xiàn),但前者并非后者的語法糖,它們的底層實現(xiàn)完全不一樣。Grouping Sets
采用的是先聯(lián)合再聚合的思路,通過 spark_grouping_id
列來保證數(shù)據(jù)的正確性;Union All
則采用先聚合再聯(lián)合的思路。Grouping Sets
在 SQL 語句表達(dá)和性能上都有更大的優(yōu)勢 。
Group By
的另外兩個高級用法 RollUp
和 Cube
則可以看成是 Grouping Sets
的語法糖,它們的底層都是基于 Expand 算子實現(xiàn), 在性能上與直接使用 Grouping Sets
是一樣的,但在 SQL 表達(dá)上更加簡潔 。
-
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
7067瀏覽量
89110 -
SQL
+關(guān)注
關(guān)注
1文章
766瀏覽量
44161 -
函數(shù)
+關(guān)注
關(guān)注
3文章
4333瀏覽量
62691
發(fā)布評論請先 登錄
相關(guān)推薦
評論