主要內容本篇主要從FlinkSQL實現的內核與原理,工作流等的視角帶大家構建一幅FlinkSQL全景圖(以Blink為主介紹),探知背后支撐的“男人們”(組件)。建議收藏,僅此一份。
主要內容:
1. Table API 與 SQL
2. Apache Calcite
3. 元數據
4. SQL 函數
5. Flink Planner 與 Blink Planner
6. Blink SQL執行過程
7. SQL優化器
8. 總結
Table API 與 Table SQLTable API 和 Table SQL 集成在同一套 API 中。這套 API 的核心概念是Table,用作查詢的輸入和輸出。
Apache Flink 具有兩個關系型 API - Table API 和 Table SQL - 用于統一的流和批處理。Table API 是 Scala 和 Java 的語言集成查詢 API,它允許用非常直觀的方式從關系運算符(如選擇、過濾和連接)組成查詢。Flink 的 SQL 支持是基于 Apache Calcite,它實現了 SQL 標準。無論輸入是批處理輸入(DataSet)還是流輸入(DataStream),在任一接口中指定的查詢都具有相同的語義,并指定相同的結果。
Table API 和 SQL 接口與 Flink 的 DataStream 和 DataSet API 緊密集成。你可以很容易地在所有 API 和建立在 API 基礎上的庫之間切換。
Apache CalciteCalcite 是什么
Apache Calcite是一款開源的動態數據管理框架,它提供了標準的 SQL 語言、多種查詢優化和連接各種數據源的能力,但不包括數據存儲、處理數據的算法和存儲元數據的存儲庫。
Calcite采用的是業界大數據查詢框架的一種通用思路,它的目標是“one size fits all(一種方案適應所有需求場景)”,希望能為不同計算平臺和數據源提供統一的查詢引擎。
Calcite作為一個強大的SQL計算引擎,在Flink內部的SQL引擎模塊就是基于Calcite。
Calcite 的特點
支持標準SQL語言;
獨立于編程語言和數據源,可以支持不同的前端和后端;
支持關系代數、可定制的邏輯規則和基于成本模型優化的查詢引擎;
支持物化視圖(materialized view)的管理(創建、丟棄、持久化和自動識別);
基于物化視圖的Lattice和Tile機制,以應用于OLAP分析;
支持對流數據的查詢。
Calcite 的功能
1. SQL 解析
Calcite 的SQL解析是通過JavaCC實現的,使用JavaCC編寫SQL語法描述文件,將SQL解析成未經校驗的AST語法樹。
2. SQL 校驗
無狀態的校驗:驗證SQL語句是否符合規范。有狀態的校驗:通過與元數據結合驗證SQL的Schema,Field,Function是否存在,輸入和輸出是否符合。
3. 查詢優化
對RelNode和邏輯計劃樹進行優化,得到優化后的生成物理執行計劃。
4. SQL 生成器
將物理執行計劃生成特定平臺的可執行程序,比如Flink,Hive,不同規則的SQL查詢語句。
5. 執行
通過各個執行平臺在內存中編譯,然后執行查詢。
FlinkSQL 結合 Calcite
一條SQL從提交到Calcite解析,優化,到最后的Flink執行,一般分以下過程:
1. Sql Parser: 將sql語句通過java cc解析成AST(語法樹),在calcite中用SqlNode表示AST;
2. Sql Validator: 結合數字字典(catalog)去驗證sql語法;
3. 生成Logical Plan: 將sqlNode表示的AST轉換成LogicalPlan, 用relNode表示;
4. 生成 optimized LogicalPlan: 先基于calcite rules 去優化logical Plan,基于flink定制的一些優化rules去優化logical Plan;
5. 生成Flink PhysicalPlan: 這里也是基于flink里頭的rules將,將optimized LogicalPlan轉成成Flink的物理執行計劃;
6. 將物理執行計劃轉成Flink ExecutionPlan: 就是調用相應的tanslateToPlan方法轉換和利用CodeGen元編程成Flink的各種算子。
Table API 來提交任務的話,基本流程和運行SQL類似,稍微不同的是:table api parser: flink會把table api表達的計算邏輯也表示成一顆樹,用treeNode去表式;在這棵樹上的每個節點的計算邏輯用Expression來表示。
簡單說一下SQL優化:RBO(基于規則)
RBO主要是開發人員在使用SQL的過程中,有些發現有些通用的規則,可以顯著提高SQL執行的效率,比如最經典的filter下推:
將Filter下推到Join之前執行,這樣做的好處是減少了Join的數量,同時降低了CPU,內存,網絡等方面的開銷,提高效率。
SQL優化的發展,則可以分為兩個階段,即RBO(基于規則),和CBO(基于代價)
RBO和CBO的區別大概在于: RBO只為應用提供的rule,而CBO會根據給出的Cost信息,智能應用rule,求出一個Cost最低的執行計劃。需要糾正很多人誤區的一點是,CBO其實也是基于rule的,接觸到RBO和CBO這兩個概念的時候,很容易將他們對立起來。但實際上CBO,可以理解為就是加上Cost的RBO。
元數據Catalog 提供了元數據信息,例如數據庫、表、分區、視圖以及數據庫或其他外部系統中存儲的函數和信息。
數據處理最關鍵的方面之一是管理元數據。元數據可以是臨時的,例如臨時表、或者通過 TableEnvironment 注冊的 UDF。元數據也可以是持久化的,例如 Hive Metastore 中的元數據。Catalog 提供了一個統一的API,用于管理元數據,并使其可以從 Table API 和 SQL 查詢語句中來訪問。
1. 目前支持的類型
(1) GenericInMemoryCatalog
是基于內存實現的 Catalog,所有元數據只在 session 的生命周期內可用。
(2) JdbcCatalog
JdbcCatalog 使得用戶可以將 Flink 通過 JDBC 協議連接到關系數據庫。PostgresCatalog 是當前實現的唯一一種 JDBC Catalog。
(3) HiveCatalog
HiveCatalog 有兩個用途:作為原生 Flink 元數據的持久化存儲,以及作為讀寫現有 Hive 元數據的接口。
(4) 用戶自定義 Catalog
Catalog 是可擴展的,用戶可以通過實現 Catalog 接口來開發自定義 Catalog。想要在 SQL CLI 中使用自定義 Catalog,用戶除了需要實現自定義的 Catalog 之外,還需要為這個 Catalog 實現對應的 CatalogFactory 接口。
CatalogFactory 定義了一組屬性,用于 SQL CLI 啟動時配置 Catalog。這組屬性集將傳遞給發現服務,在該服務中,服務會嘗試將屬性關聯到 CatalogFactory 并初始化相應的 Catalog 實例。
2. 元數據分類
catalog定義主要有三種數據類型接口,也就是常用到的數據庫,表&視圖,函數。當然還有最上層的Catalog容器。
(1) 數據庫
等同于數據庫中庫的實例,接口定義為CatalogDatabase,定義數據庫實例的元數據,一個數據庫實例中包含表,視圖,函數等多種對象。
(2) 表&視圖
CatalogTable對應數據庫中的表,CatalogView隊形數據庫中的視圖。
表是一種存儲的實體,包換了字段信息,表的分區,屬性,描述信息。其實說白了字段定義和之前印象的數據庫很是類似。你可以對比過來。不同的是,拿flink來說,所有的表都是外部數據源,除了上面所說的,還需要訪問信息,比如IP端口,mater地址,connector連接類等等。
視圖是一個虛擬概念,本質上是一條SQL查詢語句,底層對應一張表或者多張表。包含SQL查詢語句,視圖的字段信息,視圖的屬性等等的信息。
(3) 函數
CatalogFunction是函數元數據的接口。函數元數據包含了所在的類信息和編程語言。
3. 數據訪問
Flink的Table API和SQL程序可以連接到其他外部系統,用于讀和寫批處理表和流表。source table提供對存儲在外部系統(如數據庫、消息隊列或文件系統)中的數據的訪問。sink table 向外部存儲系統發送表。根據source和sink器的類型,它們支持不同的格式,如CSV、Avro、Parquet或ORC。
(1) TableSchema
Table Source 和 Sink需要具備對外數據源的描述能力,所以Flink定義了TableSchema對象來定義字段名稱和字段類型,存儲格式等等信息
(2) 時間屬性
支持處理時間和時間時間
(3) Watermark
用來處理亂序的數據。
4. Table Source & Table Sink
Flink本地支持各種連接器,可以查看往期總結
Filesystem
Elasticsearch
Apache Kafka
Amazon Kinesis Data Streams
JDBC
Apache HBase
Apache Hive
幾個主要Table Source與Sink體系
(1) StreamTableSource
流數據抽象,區分了無界數據與有界數據。
(2) LookupableTableSource
按照Join條件中的字段進行關聯。
(3) FilterableTableSource
過濾不符合條件的記錄。
(4) LimitableTableSource
限制記錄條數。
(5) ProjectableTableSource
過濾不會被使用的字段。
(6) AppendStreamTableSink
追加模式的TableSink 支持追加,不支持更新。
(7) RetractStreamTableSink
支持召回模式的TableSink,召回模式其實就是流上的update。
(8) UpsertStreamTableSink
有則更新,無則插入
SQL 函數臨時函數和持久化函數。臨時函數始終由用戶創建,它容易改變并且僅在會話的生命周期內有效。持久化函數不是由系統提供,就是存儲在 Catalog 中,它在會話的整個生命周期內都有效。
內置函數
Table API和SQL為用戶提供了一組用于數據轉換的內置函數。如果您需要的函數還不受支持,您可以實現用戶定義的函數
(1) Comparison Functions(比較型函數)
eg:value1 = value2
(2) Logical Functions(邏輯函數)
eg: boolean1 OR boolean2
(3) Arithmetic Functions(算術函數)
eg: numeric1 + numeric2
(4) String Functions(字符串函數)
UPPER(string)
(5) Temporal Functions(時間函數)
YEAR(date)
(6) Conditional Functions(有條件的函數)
IF(condition, true_value, false_value)
(7) Type Conversion Functions(類型轉換函數)
CAST(value AS type)
(8) Collection Functions(集合函數)
array ‘[’ INT ‘]’
(9) Value Construction Functions , Value Access Functions,Grouping Functions,Hash Functions,Auxiliary Functions,Aggregate Functions,Column Functions (不一一列舉)
自定義函數
(1) 標量函數(UDF)
標量函數 將標量值轉換成一個新標量值,也就是對一行數據中的一個或者多個字段返回一個單值。
(2) 聚合函數(UDAGG)
自定義聚合函數(UDAGG)是把一個表(一行或者多行,每行可以有一列或者多列)聚合成一個標量值。
(3) 表值函數(UDTF)
表值函數 將標量值轉換成新的行數據。可以接收一個或者多個字段作為參數,輸出多行列數據。
(4) 表值聚合函數(UDTAGG)
自定義表值聚合函數(UDTAGG)可以把一個表(一行或者多行,每行有一列或者多列)聚合成另一張表,結果中可以有多行多列。
(5) 異步表值函數
異步表值函數 是異步查詢外部數據系統的特殊函數。
Planner 與 Blink PlannerFlink Table/SQL體系中的Planner(即查詢處理器)是溝通Flink與Calcite的橋梁,為Table/SQL API提供完整的解析、優化和執行環境。
Flink Table 的新架構實現了查詢處理器的插件化,項目完整保留原有 Flink Planner (Old Planner),同時又引入了新的 Blink Planner,用戶可以自行選擇使用 Old Planner 還是 Blink Planner。
主要區別:
Blink做到了真正的流批統一,即將批看做是特殊的流,把處理批的API和處理流的API做成了一樣的。也就是說不管是批數據還是流數據,底層統統都是DataStream。所以使用Blink作為table planner的程序,Table和DataSet是不能相互轉換的。
Blink planner是不支持BatchTableSource的,它只支持StreamTableSource。
Blink Planner和Old Planner的FilterableTableSource是不兼容的。Old - Planner會下推PlannerExpression到FilterableTableSource。而Blink planner下推的是Expression。
基于String的鍵值對配置項只能用于Blink Planner
Blink Planner會優化多個sink到同一個TableEnvironment和StreamTableEnvironment。而Old Planner會為不同的sink優化到自己的DAG中,也就是說有幾個sink就有幾個DAG。
Old Planner 不支持 catalog統計,Blink支持。
Old Planner 不支持版本表(versioned Table)。版本表類似HBASE中版本表的意思,每個key可以記住過去的幾個值。
Blink SQL執行過程
SQL執行過程分三個階段
(1) 從SQL到 Operation
(2) 從Operation 到 Transformation
(3) 環境的執行階段
從SQL到 Operation
(1) 解析SQL轉換為QueryOperation;
(2) SQL解析為SqlNode;
(3) 校驗SqlNode;
(4) 調用Calcite SQLToRelConvertrt將SqlNode轉化為RelNode邏輯樹;
(5) RelNode轉化為Operation。
Operation 到 Transformation
(1) DQL(數據查詢語言)轉換,在flink中作為中間運算;
(2) DML(數據操作語言),DQL轉換。
整個轉換從Operation開始,先轉換為Calcite的邏輯計劃樹,再轉化為Flink的邏輯計劃樹,然后進行優化。優化后的邏輯樹轉換為Flink的物理執行,物理執行生成一系列的算子,udf等等,包裝到Transformation中。
環境的執行階段
有了Transformation后正式進入到StreamGraph的過程中,最終交給Flink集群去運行。
SQL優化器查詢優化器
再次提到兩個優化器:RBO(基于規則的優化器) 和 CBO(基于代價的優化器)
(1) RBO(基于規則的優化器)會將原有表達式裁剪掉,遍歷一系列規則(Rule),只要滿足條件就轉換,生成最終的執行計劃。一些常見的規則包括分區裁剪(Partition Prune)、列裁剪、謂詞下推(Predicate Pushdown)、投影下推(Projection Pushdown)、聚合下推、limit下推、sort下推、常量折疊(Constant Folding)、子查詢內聯轉join等。
(2) CBO(基于代價的優化器)會將原有表達式保留,基于統計信息和代價模型,嘗試探索生成等價關系表達式,最終取代價最小的執行計劃。CBO的實現有兩種模型,Volcano模型,Cascades模型。這兩種模型思想很是相似,不同點在于Cascades模型一邊遍歷SQL邏輯樹,一邊優化,從而進一步裁剪掉一些執行計劃。
目前各大數據庫和計算引擎傾向于CBO。
總結在目前情況下,在阿里對Flink社區的貢獻下,Flink包含了Flink SQL 和 Blink SQL體系,Flink Planner稱之為 Old Planner,Blink Planner稱之為 New Planner。從中可以發現 Blink Planner是未來,Flink Planner將會被淘汰。
FlinkSQL依靠 Calcite提供了一套SQL驗證,解析,優化等等操作。同時FlinkSQL提供元數據管理,SQL函數,數據源的建設。也自由化地提供了自定義函數,自定義connector連接,豐富了場景的使用。
FlinkSQL你值得擁有!!!
編輯:jq
-
處理器
+關注
關注
68文章
19259瀏覽量
229655 -
數據管理
+關注
關注
1文章
294瀏覽量
19610 -
SQL
+關注
關注
1文章
762瀏覽量
44117 -
UDF
+關注
關注
0文章
4瀏覽量
6468
原文標題:干貨:詳解 FlinkSQL 實現原理
文章出處:【微信號:DBDevs,微信公眾號:數據分析與開發】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論