一、概念
Apache Storm作為流數據的實時處理框架,官網給出了如下模型:
圖中“水龍頭”便是spout [spa?t] 出水管,“閃電”便是bolt [b??lt],“箭頭”表達的是數據的流轉,“水龍頭”、“閃電”和“箭頭”組成的有向無環圖稱為Topology(拓撲)。
使用Storm框架進行流數據的實時處理,就需要編寫“水龍頭”和“閃電”的處理邏輯,并將它們通過Topology串接在一起,構建實時處理的業務邏輯。
具體的做法是:
(1)實時數據源,如kafka,接入到“水龍頭”Spout中;
(2)Spout讀取源數據并不斷地發出數據到后續Bolt中,這些數據稱為Tuple(元組);
(3)Bolt對發送過來的數據Tuple進行處理,完成數據流轉換;
讀到這里,可能還是很迷惑,我們以常見的示例統計詞頻(heart.txt)來進行說明:
Take me to your heart
Take me to your soul
Give me your hand and hold me
Show me what love is
Be my guiding star
It's easy take me to your heart
Standing on a mountain high
Looking at the moon through a clear blue sky
我們可以設計一個topology:
WordSourceSpout:讀取heart.txt,并逐行發送數據流Stream,每行即為一個Tuple;
WordSplitBolt:拆分Tuple,并將單詞Tuple發出到下個Bolt;
WordCountBolt:對單詞的頻率進行累加計算;
二、編程
1.Topology是如何構建的?
Topology是通過TopologyBuilder來構建的,提供setSpout和setBolt方法來配置Spout和Bolt,這兩個方法都具有3個參數,比較類似,以setSpout為例,第1個參數表示Stream的名稱,第2個參數表示stream的處理對象,第3個參數表示并發數,也就是同時運行多少個任務來處理Stream。先來看一段代碼:
TopologyBuilder topologyBuilder = new TopologyBuilder();
WordSourceSpout spout = new WordSourceSpout();
WordSplitBolt splitBlot = new WordSplitBolt();
WordCountBolt countBlot = new WordCountBolt();
topologyBuilder.setSpout("sentences", spout, 2);
topologyBuilder.setBolt("split",splitBlot , 8).shuffleGrouping("sentences");
topologyBuilder.setBolt("count",countBlot , 8).fieldGrouping("split",new Fields(“word”));
上面定義了兩個Bolt,它們之間數據流的關聯關系:第1個Bolt聲明其輸出Stream的名稱為split,而第2個Bolt訂閱的Stream為split。countBlot 通過fieldGroupings()在word上具有相同字段的所有Tuple發送到同一個任務中進行統計。
2.Spout和Bolt是如何定義的?
編程模型中,Spout和Bolt都稱為組件Component。
WordSourceSpout 需要繼承BaseRichSpout,其類結構關系為:
BaseRichSpout--繼承--BaseComponent--實現--IComponent
BaseRichSpout--實現--IRichSpout--實現--ISpout
ISpout接口的定義為:
public interface ISpout extends Serializable {
/**
* Called when a task for this component is initialized within a worker on the cluster. It provides the spout with the environment in
* which the spout executes.
*
* This includes the:
*
* @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster
* configuration on this machine.
* @param context This object can be used to get information about this task's place within the topology, including the task id and
* component id of this task, input and output information, etc.
* @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and
* close methods. The collector is thread-safe and should be saved as an instance variable of this spout object.
*/
void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector);
/**
* Called when an ISpout is going to be shutdown. There is no guarentee that close will be called, because the supervisor kill -9's
* worker processes on the cluster.
*
* The one context where close is guaranteed to be called is a topology is killed when running Storm in local mode.
*/
void close();
/**
* Called when a spout has been activated out of a deactivated mode. nextTuple will be called on this spout soon. A spout can become
* activated after having been deactivated when the topology is manipulated using the `storm` client.
*/
void activate();
/**
* Called when a spout has been deactivated. nextTuple will not be called while a spout is deactivated. The spout may or may not be
* reactivated in the future.
*/
void deactivate();
/**
* When this method is called, Storm is requesting that the Spout emit tuples to the output collector. This method should be
* non-blocking, so if the Spout has no tuples to emit, this method should return. nextTuple, ack, and fail are all called in a tight
* loop in a single thread in the spout task. When there are no tuples to emit, it is courteous to have nextTuple sleep for a short
* amount of time (like a single millisecond) so as not to waste too much CPU.
*/
void nextTuple();
/**
* Storm has determined that the tuple emitted by this spout with the msgId identifier has been fully processed. Typically, an
* implementation of this method will take that message off the queue and prevent it from being replayed.
*/
void ack(Object msgId);
/**
* The tuple emitted by this spout with the msgId identifier has failed to be fully processed. Typically, an implementation of this
* method will put that message back on the queue to be replayed at a later time.
*/
void fail(Object msgId);
}
WordCountBolt需要繼承BaseBasicBolt,其類結構關系為:
BaseBasicBolt--繼承--BaseComponent--實現--IBasicBolt--IComponent
IBasicBolt接口的定義為:
public interface IBasicBolt extends IComponent {
void prepare(Map topoConf, TopologyContext context) ;
/**
* Process the input tuple and optionally emit new tuples based on the input tuple.
*
* All acking is managed for you. Throw a FailedException if you want to fail the tuple.
*/
void execute(Tuple input, BasicOutputCollector collector);
void cleanup();
}
IComponent接口的定義:
/**
* Common methods for all possible components in a topology. This interface is used when defining topologies using the Java API.
*/
public interface IComponent extends Serializable {
/**
* Declare the output schema for all the streams of this topology.
*
* @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
*/
void declareOutputFields(OutputFieldsDeclarer declarer);
/**
* Declare configuration specific to this component. Only a subset of the "topology.*" configs can be overridden. The component
* configuration can be further overridden when constructing the topology using {@link TopologyBuilder}
*/
Map<String, Object> getComponentConfiguration();
}
Storm框架基本邏輯為:
Spout組件通過Open方法進行SpoutOutputCollector(Spout輸出收集器)的初始化,Storm調用Spout組件的nextTuple方法請求tuple時,便通過SpoutOutputCollector的emit方法發送一個tuple。Bolt組件通過execute方法接收到tuple,并對tuple進行數據處理。
-
邏輯
+關注
關注
2文章
833瀏覽量
29464 -
編寫
+關注
關注
0文章
29瀏覽量
8442 -
Storm框架
+關注
關注
0文章
3瀏覽量
1527
發布評論請先 登錄
相關推薦
評論