色哟哟视频在线观看-色哟哟视频在线-色哟哟欧美15最新在线-色哟哟免费在线观看-国产l精品国产亚洲区在线观看-国产l精品国产亚洲区久久

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

Apache Storm是什么

汽車電子技術 ? 來源:碼農與軟件時代 ? 作者:碼農與軟件時代 ? 2023-02-20 15:34 ? 次閱讀

一、概念

Apache Storm作為流數據的實時處理框架,官網給出了如下模型:

圖片

圖中“水龍頭”便是spout [spa?t] 出水管,“閃電”便是bolt [b??lt],“箭頭”表達的是數據的流轉,“水龍頭”、“閃電”和“箭頭”組成的有向無環圖稱為Topology(拓撲)。

使用Storm框架進行流數據的實時處理,就需要編寫“水龍頭”和“閃電”的處理邏輯,并將它們通過Topology串接在一起,構建實時處理的業務邏輯。

具體的做法是:

(1)實時數據源,如kafka,接入到“水龍頭”Spout中;

(2)Spout讀取源數據并不斷地發出數據到后續Bolt中,這些數據稱為Tuple(元組);

(3)Bolt對發送過來的數據Tuple進行處理,完成數據流轉換;

讀到這里,可能還是很迷惑,我們以常見的示例統計詞頻(heart.txt)來進行說明:

Take me to your heart
Take me to your soul
Give me your hand and hold me
Show me what love is
Be my guiding star
It's easy take me to your heart
Standing on a mountain high
Looking at the moon through a clear blue sky

我們可以設計一個topology:

WordSourceSpout:讀取heart.txt,并逐行發送數據流Stream,每行即為一個Tuple;
WordSplitBolt:拆分Tuple,并將單詞Tuple發出到下個Bolt;
WordCountBolt:對單詞的頻率進行累加計算;

二、編程

1.Topology是如何構建的?

Topology是通過TopologyBuilder來構建的,提供setSpout和setBolt方法來配置Spout和Bolt,這兩個方法都具有3個參數,比較類似,以setSpout為例,第1個參數表示Stream的名稱,第2個參數表示stream的處理對象,第3個參數表示并發數,也就是同時運行多少個任務來處理Stream。先來看一段代碼:

TopologyBuilder topologyBuilder = new TopologyBuilder();
WordSourceSpout spout = new WordSourceSpout();
WordSplitBolt splitBlot = new WordSplitBolt();
WordCountBolt countBlot = new WordCountBolt();
topologyBuilder.setSpout("sentences", spout, 2);
topologyBuilder.setBolt("split",splitBlot , 8).shuffleGrouping("sentences");
topologyBuilder.setBolt("count",countBlot , 8).fieldGrouping("split",new Fields(“word”));

上面定義了兩個Bolt,它們之間數據流的關聯關系:第1個Bolt聲明其輸出Stream的名稱為split,而第2個Bolt訂閱的Stream為split。countBlot 通過fieldGroupings()在word上具有相同字段的所有Tuple發送到同一個任務中進行統計。

2.Spout和Bolt是如何定義的?

編程模型中,Spout和Bolt都稱為組件Component。

WordSourceSpout 需要繼承BaseRichSpout,其類結構關系為:

BaseRichSpout--繼承--BaseComponent--實現--IComponent
BaseRichSpout--實現--IRichSpout--實現--ISpout

ISpout接口的定義為:

public interface ISpout extends Serializable {
    /**
     * Called when a task for this component is initialized within a worker on the cluster. It provides the spout with the environment in
     * which the spout executes.
     *
     * 

This includes the: * * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster * configuration on this machine. * @param context This object can be used to get information about this task's place within the topology, including the task id and * component id of this task, input and output information, etc. * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and * close methods. The collector is thread-safe and should be saved as an instance variable of this spout object. */ void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector); /** * Called when an ISpout is going to be shutdown. There is no guarentee that close will be called, because the supervisor kill -9's * worker processes on the cluster. * *

The one context where close is guaranteed to be called is a topology is killed when running Storm in local mode. */ void close(); /** * Called when a spout has been activated out of a deactivated mode. nextTuple will be called on this spout soon. A spout can become * activated after having been deactivated when the topology is manipulated using the `storm` client. */ void activate(); /** * Called when a spout has been deactivated. nextTuple will not be called while a spout is deactivated. The spout may or may not be * reactivated in the future. */ void deactivate(); /** * When this method is called, Storm is requesting that the Spout emit tuples to the output collector. This method should be * non-blocking, so if the Spout has no tuples to emit, this method should return. nextTuple, ack, and fail are all called in a tight * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous to have nextTuple sleep for a short * amount of time (like a single millisecond) so as not to waste too much CPU. */ void nextTuple(); /** * Storm has determined that the tuple emitted by this spout with the msgId identifier has been fully processed. Typically, an * implementation of this method will take that message off the queue and prevent it from being replayed. */ void ack(Object msgId); /** * The tuple emitted by this spout with the msgId identifier has failed to be fully processed. Typically, an implementation of this * method will put that message back on the queue to be replayed at a later time. */ void fail(Object msgId); }

WordCountBolt需要繼承BaseBasicBolt,其類結構關系為:

BaseBasicBolt--繼承--BaseComponent--實現--IBasicBolt--IComponent

IBasicBolt接口的定義為:

public interface IBasicBolt extends IComponent {
    void prepare(Map topoConf, TopologyContext context);


    /**
     * Process the input tuple and optionally emit new tuples based on the input tuple.
     *
     * 

All acking is managed for you. Throw a FailedException if you want to fail the tuple. */ void execute(Tuple input, BasicOutputCollector collector); void cleanup(); }

IComponent接口的定義:

/**
 * Common methods for all possible components in a topology. This interface is used when defining topologies using the Java API.
 */
public interface IComponent extends Serializable {


    /**
     * Declare the output schema for all the streams of this topology.
     *
     * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
     */
    void declareOutputFields(OutputFieldsDeclarer declarer);


    /**
     * Declare configuration specific to this component. Only a subset of the "topology.*" configs can be overridden. The component
     * configuration can be further overridden when constructing the topology using {@link TopologyBuilder}
     */
    Map<String, Object> getComponentConfiguration();


}

Storm框架基本邏輯為:

Spout組件通過Open方法進行SpoutOutputCollector(Spout輸出收集器)的初始化,Storm調用Spout組件的nextTuple方法請求tuple時,便通過SpoutOutputCollector的emit方法發送一個tuple。Bolt組件通過execute方法接收到tuple,并對tuple進行數據處理。

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 邏輯
    +關注

    關注

    2

    文章

    833

    瀏覽量

    29464
  • 編寫
    +關注

    關注

    0

    文章

    29

    瀏覽量

    8442
  • Storm框架
    +關注

    關注

    0

    文章

    3

    瀏覽量

    1527
收藏 人收藏

    評論

    相關推薦

    Storm使用場景

    Storm基礎(一):架構和組件
    發表于 06-11 16:37

    Storm高階之Trident的全面介紹

    Storm高階(一):Trident
    發表于 07-29 10:18

    雅虎機器學習平臺CaffeOnSpark解讀

    Andy Feng是Apache Storm的Committer,同時也是雅虎公司負責大數據與機器學習平臺的副總裁。他帶領雅虎機器學習團隊基于開源的Spark和Caffe開發了深度學習框架
    發表于 10-10 11:46 ?0次下載
    雅虎機器學習平臺CaffeOnSpark解讀

    怎樣在Docker Swarm上部署Apache Storm

    本文是一篇來源于Baqend Tech博客的客座轉貼,描述了如何在Docker Swarm,而不是在虛擬機上部署和調配Apache Storm集群。這個題目很有意思,Wolfram
    發表于 10-10 14:24 ?0次下載
    怎樣在Docker Swarm上部署<b class='flag-5'>Apache</b> <b class='flag-5'>Storm</b>

    如何利用Storm完成實時分析處理數據

    Storm本身是Apache托管的開源的分布式實時計算系統,它的前身是Twitter Storm。在Storm問世以前,處理海量的實時數據信息,大部分是類似于使用消息隊列,加上工作進程
    發表于 04-26 15:30 ?8084次閱讀
    如何利用<b class='flag-5'>Storm</b>完成實時分析處理數據

    Storm環境下基于權重的任務調度算法

    大數據流式計算平臺Apache Storm默認采用輪詢的方式進行任務調度,未考慮到拓撲中各任務計算開銷的差異以及任務之間不同類型的通信模式,在負載均衡和通信開銷方面存在較大的優化空間。針對這一
    發表于 04-17 10:52 ?0次下載
    <b class='flag-5'>Storm</b>環境下基于權重的任務調度算法

    探討Apache kafka在部署可伸縮物聯網解決方案中所扮演的角色

    Apache stormApache spark和Apache hadoop集群提供支持的數據處理管道的網關。
    發表于 07-21 09:37 ?577次閱讀

    一種基于Apache Storm的增量式FFT方法

    針對傳統單機版批處理式的快速傅里葉變換( Fast fourier transfor,FFT)難以滿足工業生產現場海量流數據實時處理的需求,提出一種基于Δ pache Storm的增量式FFT方法
    發表于 04-28 14:44 ?10次下載
    一種基于<b class='flag-5'>Apache</b> <b class='flag-5'>Storm</b>的增量式FFT方法

    Apache與Weblogic的整合

    Apache與Weblogic的整合(電源技術論文錄用后可以改作者嗎)-Apache與Weblogic的整合? ? ? ? ? ? ? ? ? ? ? ?
    發表于 08-31 11:24 ?7次下載
    <b class='flag-5'>Apache</b>與Weblogic的整合

    Linux的apache

    Linux的apache(ups電源技術轉讓)-Linux的apache,有需要的可以參考!
    發表于 08-31 16:17 ?1次下載
    Linux的<b class='flag-5'>apache</b>

    Storm-Engine基于C++的開源游戲引擎

    ./oschina_soft/storm-engine.zip
    發表于 06-16 10:05 ?0次下載
    <b class='flag-5'>Storm</b>-Engine基于C++的開源游戲引擎

    Apache Doris正式成為 Apache 頂級項目

    全球最大的開源軟件基金會 Apache 軟件基金會(以下簡稱 Apache)于美國時間 2022 年?6 月 16 日宣布,Apache Doris 成功從 Apache 孵化器畢業,
    的頭像 發表于 06-17 14:08 ?1005次閱讀

    Apache Storm的安裝部署

    Storm是一個免費開源的分布式實時計算系統。分布式意味著Storm是一個集群,部署在多臺機器上。實時便是實時計算,相比于MapReduce的批處理,實時更關注于數據處理的速度和延時。
    的頭像 發表于 02-20 15:41 ?959次閱讀
    <b class='flag-5'>Apache</b> <b class='flag-5'>Storm</b>的安裝部署

    大數據平臺有哪些 大數據技術應用有哪些

    。   2. 實時數據處理平臺:Apache Kafka、Apache StormApache Ignite等,專注于實時數據處理和流計算,適用于流媒體、監控和物聯網等場景。
    的頭像 發表于 04-16 16:14 ?1.3w次閱讀

    什么是Apache日志?Apache日志分析工具介紹

    Apache Web 服務器在企業中廣泛用于托管其網站和 Web 應用程序,Apache 服務器生成的原始日志提供有關 Apache 服務器托管的網站如何處理用戶請求以及訪問您的網站時經常遇到的錯誤的重要信息。
    的頭像 發表于 01-04 10:09 ?832次閱讀
    主站蜘蛛池模板: 久久日本片精品AAAAA国产| 免费a视频在线观看| 蜜臀久久99精品久久久久久做爰 | 午夜看片a福利在线观看| 亚洲欧美一区二区三区导航| 3344永久在线观看视频免费| 动漫美女人物被黄漫在线看| 久爱精品亚洲电影午夜| 日本强好片久久久久久AAA| 亚洲日本天堂在线| 被老总按在办公桌吸奶头| 狠狠色香婷婷久久亚洲精品| 青青伊人国产| 一本大道无码AV天堂欧美| 春水福利app导航| 久久精品一区二区三区资源网 | 一级特黄视频| 夫妻主vk| 么公在浴室了我的奶| 午夜性色一区二区三区不卡视频| 98久久人妻少妇激情啪啪| 国产亚洲欧美ai在线看片| 秋霞在线看片无码免费| 在线观看黄色小说| 国产精品午夜福利在线观看| 嗯好大好猛皇上好深用力| 亚洲欧美日韩高清专区| 福利一区国产| 女人高潮时一吸一夹| 一本道中文无码亚洲| 国产精品爽黄69天堂A片| 欧美 另类 美腿 亚洲 无码 | 国产精品99久久久久久人韩国| 伦理 电影在线观看| 亚洲成色WWW久久网站夜月| 成人毛片免费播放| 美女挑战50厘米长的黑人| 亚洲日本欧美日韩高观看| 国产日韩欧美三级| 双性将军粗壮H灌满怀孕| 99久久无码一区人妻A片蜜|