作者:京東物流 吳云濤
前言
提交一個(gè)DataSteam 的 Flink應(yīng)用,需要經(jīng)過(guò) StreamGraph、JobGraph、ExecutionGraph 三個(gè)階段的轉(zhuǎn)換生成可成執(zhí)行的有向無(wú)環(huán)圖(DAG),并在 Flink 集群上運(yùn)行。而提交一個(gè) Flink SQL 應(yīng)用,其執(zhí)行流程也類(lèi)似,只是多了一步使用 flink-table-planer 模塊從SQL轉(zhuǎn)換成 StreamGraph 的過(guò)程。以下是利用Flink的 StreamGraph 通過(guò)低代碼的方式,來(lái)實(shí)現(xiàn)StreamGraph的生成,并最終實(shí)現(xiàn) Flink 程序零代碼開(kāi)發(fā)的解決方案。
一、Flink 相關(guān)概念
在Flink程序中,每個(gè)算子被稱作Operator,通過(guò)各個(gè)算子的處理最終得到期望的加工后數(shù)據(jù)。比如下面這段程序中,增加了Source, Fiter, Map, Sink 4個(gè)算子。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream dataStream = env.addSource(new FlinkKafkaConsumer("topic")); DataStream filteredStream = dataStream.filter(new FilterFunction() { @Override public boolean filter(Object value) throws Exception {return true;} }); DataStream mapedStream = filteredStream.map(new MapFunction() { @Override public Object map(Object value) throws Exception {return value;} }); mapedStream.addSink(new DiscardingSink()); env.execute("test-job");
StreamGraph
Flink的邏輯執(zhí)行圖,描述了整個(gè)流處理任務(wù)的流程和數(shù)據(jù)流轉(zhuǎn)遞規(guī)則,包括了數(shù)據(jù)源(Source)、轉(zhuǎn)換算子(Transform)、數(shù)據(jù)目的端(Sink)等元素,以及它們之間的依賴關(guān)系和傳輸規(guī)則。StreamGraph是通過(guò)Flink的API或者DSL來(lái)構(gòu)建的向無(wú)環(huán)圖(DAG),它與JobGraph之間是一一對(duì)應(yīng)的關(guān)系。StreamGraph中的頂點(diǎn)稱為streamNode,是用來(lái)表示Operator算子的類(lèi),包含了算子uid、并行度,是否共享slot(SlotSharingGroup)等信息。邊稱作streamEdge。通過(guò)StreamingJobGraphGenerator類(lèi)生成JobGraph。
JobGraph
StreamGraph 經(jīng)過(guò) flink-optimizer 模塊優(yōu)化后生成 JobGraph。生成 JobGraph 時(shí),會(huì)將多個(gè)滿足條件的算子chain 鏈接到一起作為一個(gè)頂點(diǎn)(JobVertex), 在運(yùn)行時(shí)對(duì)應(yīng)1個(gè) Task。Task 是 Flink 程序的基本執(zhí)行單元,任務(wù)調(diào)度時(shí)將Task分配到TaskManager上執(zhí)行。
ExecutionGraph
物理執(zhí)行圖是由JobGraph轉(zhuǎn)換而來(lái),描述了整個(gè)流處理任務(wù)的物理執(zhí)行細(xì)節(jié),包括了任務(wù)的調(diào)度、任務(wù)的執(zhí)行順序、任務(wù)之間的數(shù)據(jù)傳輸、任務(wù)的狀態(tài)管理等。Task會(huì)在步驟中拆分為多個(gè)SubTask。對(duì)應(yīng)Task中的每個(gè)并行度。
Physical Graph
PhysicalGraph是在執(zhí)行時(shí)的ExecutionGraph。ExecutionGraph中的每一個(gè)頂點(diǎn)ExecutionJobVertex都對(duì)應(yīng)一個(gè)或多個(gè)頂點(diǎn)ExecutionVertex,它們是物理執(zhí)行圖中的節(jié)點(diǎn)。
二、畫(huà)布模式實(shí)現(xiàn)思路
實(shí)現(xiàn)流程
首先,我們采用畫(huà)布模式(拖拉拽方式)來(lái)實(shí)現(xiàn)Flink程序的組裝,將極大程度上方便我們復(fù)用部分加工的算子,最終實(shí)現(xiàn)零代碼的Flink應(yīng)用開(kāi)發(fā)。我們通過(guò)繪圖的方式,直接將內(nèi)置的算子繪制在圖標(biāo)上。如下所示:
構(gòu)建有向無(wú)環(huán)圖(DAG),并持久化。通過(guò)拖拉拽的方式(畫(huà)布模式)構(gòu)建你的Flink應(yīng)用,后端的持久化存儲(chǔ)采用鄰接表方式。我們?cè)?mysql 關(guān)系數(shù)據(jù)庫(kù)中將 Node(算子:Source、Sink、中間加工邏輯算子)存儲(chǔ)到 flink_node 表中;將邊存到一張 flink_realation 表中。
重新組將Flink作業(yè)
要組裝以上畫(huà)布模式的Flink應(yīng)用,首先需要初始化好 StreamExecutionEnvironment 相關(guān)參數(shù),其次將上述表中的 flink_node 和flink_edge 轉(zhuǎn)化為DataStream,并將轉(zhuǎn)化出的 DataStream 合理地拼接成一個(gè) DataStream API Flink 應(yīng)用程序。
在將flink_node、flink_edge轉(zhuǎn)為為DataStream時(shí)選擇何種遍歷算法來(lái)組裝呢?我們知道有向無(wú)環(huán)圖的遍歷最常用的有:深度優(yōu)先遍歷(DFS)和廣度優(yōu)先遍歷(BFS)。這里我們采用了BFS算法+層序遍歷的方式,BFS便于在組裝的過(guò)程中將已visit到的node節(jié)點(diǎn)拼裝到其parent 的節(jié)點(diǎn)上。
總結(jié)
在實(shí)際的實(shí)現(xiàn)過(guò)程中,遇到的問(wèn)題往往比以上復(fù)雜很多。比如需要將更多的信息存儲(chǔ)在node節(jié)點(diǎn)和edge邊上。node上需要存儲(chǔ)并行度、算子處理前后的表schema等;edge需要存儲(chǔ)keyby的字段、上下游之間的數(shù)據(jù)shuffle的方式等等。此外在內(nèi)置的算子無(wú)法滿足用戶需求時(shí),還需要考慮如何友好的支持自定義算子(UDF)的嵌入等問(wèn)題。
審核編輯 黃宇
-
開(kāi)發(fā)
+關(guān)注
關(guān)注
0文章
370瀏覽量
40836 -
代碼
+關(guān)注
關(guān)注
30文章
4779瀏覽量
68524
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論