色哟哟视频在线观看-色哟哟视频在线-色哟哟欧美15最新在线-色哟哟免费在线观看-国产l精品国产亚洲区在线观看-国产l精品国产亚洲区久久

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

怎樣去解決Kafka消息重復(fù)的問題呢?

jf_ro2CN3Fa ? 來源:稀土掘金 ? 2023-02-12 14:18 ? 次閱讀

一、前言

數(shù)據(jù)重復(fù)這個問題其實(shí)也是挺正常,全鏈路都有可能會導(dǎo)致數(shù)據(jù)重復(fù)。

00fc324a-a28b-11ed-bfe3-dac502259ad0.jpg

通常,消息消費(fèi)時候都會設(shè)置一定重試次數(shù)來避免網(wǎng)絡(luò)波動造成的影響,同時帶來副作用是可能出現(xiàn)消息重復(fù)。

整理下消息重復(fù)的幾個場景:

生產(chǎn)端: 遇到異常,基本解決措施都是 重試 。

場景一:leader分區(qū)不可用了,拋 LeaderNotAvailableException 異常,等待選出新 leader 分區(qū)。

場景二:Controller 所在 Broker 掛了,拋 NotControllerException 異常,等待 Controller 重新選舉。

場景三:網(wǎng)絡(luò)異常、斷網(wǎng)、網(wǎng)絡(luò)分區(qū)、丟包等,拋 NetworkException 異常,等待網(wǎng)絡(luò)恢復(fù)。

消費(fèi)端: poll 一批數(shù)據(jù),處理完畢還沒提交 offset ,機(jī)子宕機(jī)重啟了,又會 poll 上批數(shù)據(jù),再度消費(fèi)就造成了消息重復(fù)。

怎么解決?

先來了解下消息的三種投遞語義:

最多一次( at most once): 消息只發(fā)一次,消息可能會丟失,但絕不會被重復(fù)發(fā)送。例如:mqtt 中 QoS = 0。

至少一次( at least once): 消息至少發(fā)一次,消息不會丟失,但有可能被重復(fù)發(fā)送。例如:mqtt 中 QoS = 1

精確一次( exactly once): 消息精確發(fā)一次,消息不會丟失,也不會被重復(fù)發(fā)送。例如:mqtt 中 QoS = 2。

了解了這三種語義,再來看如何解決消息重復(fù),即如何實(shí)現(xiàn)精準(zhǔn)一次,可分為三種方法:

Kafka 冪等性 Producer: 保證生產(chǎn)端發(fā)送消息冪等。局限性,是只能保證單分區(qū)且單會話(重啟后就算新會話)

Kafka 事務(wù): 保證生產(chǎn)端發(fā)送消息冪等。解決冪等 Producer 的局限性。

消費(fèi)端冪等:保證消費(fèi)端接收消息冪等。蔸底方案。

1)Kafka 冪等性 Producer

冪等性指 :無論執(zhí)行多少次同樣的運(yùn)算,結(jié)果都是相同的。即一條命令,任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。

冪等性使用示例:在生產(chǎn)端添加對應(yīng)配置即可

Propertiesprops=newProperties();
props.put("enable.idempotence",ture);//1.設(shè)置冪等
props.put("acks","all");//2.當(dāng)enable.idempotence為true,這里默認(rèn)為all
props.put("max.in.flight.requests.per.connection",5);//3.注意

設(shè)置冪等,啟動冪等。

配置 acks,注意:一定要設(shè)置 acks=all,否則會拋異常。

配置 max.in.flight.requests.per.connection 需要 <= 5 ,否則會拋異常 OutOfOrderSequenceException。

0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1

Kafka >= 1.1, max.in.flight.request.per.connection <= 5

[**為了更好理解,需要了解下 Kafka 冪等機(jī)制:]

010c3212-a28b-11ed-bfe3-dac502259ad0.jpg

Producer 每次啟動后,會向 Broker 申請一個全局唯一的 pid。(重啟后 pid 會變化,這也是弊端之一)

Sequence Numbe:針對每個 都對應(yīng)一個從0開始單調(diào)遞增的 Sequence,同時 Broker端會緩存這個 seq num

判斷是否重復(fù): 去 Broker 里對應(yīng)的隊(duì)列 ProducerStateEntry.Queue(默認(rèn)隊(duì)列長度為 5)查詢是否存在

如果 nextSeq == lastSeq + 1,即 服務(wù)端seq + 1 == 生產(chǎn)傳入seq,則接收。

如果 nextSeq == 0 && lastSeq == Int.MaxValue,即剛初始化,也接收。

反之,要么重復(fù),要么丟消息,均拒絕。

011a794e-a28b-11ed-bfe3-dac502259ad0.jpg

這種設(shè)計針對解決了兩個問題:

消息重復(fù): 場景 Broker 保存消息后還沒發(fā)送 ack 就宕機(jī)了,這時候 Producer 就會重試,這就造成消息重復(fù)。

消息亂序: 避免場景,前一條消息發(fā)送失敗而其后一條發(fā)送成功,前一條消息重試后成功,造成的消息亂序。

那什么時候該使用冪等:

如果已經(jīng)使用 acks=all,使用冪等也可以。

如果已經(jīng)使用 acks=0 或者 acks=1,說明你的系統(tǒng)追求高性能,對數(shù)據(jù)一致性要求不高。不要使用冪等。

2)Kafka 事務(wù)

使用 Kafka 事務(wù)解決冪等的弊端:單會話且單分區(qū)冪等。

Tips: 這塊篇幅較長,這先稍微提及下使用,之后另起一篇。

事務(wù)使用示例:分為生產(chǎn)端 和 消費(fèi)端

Propertiesprops=newProperties();
props.put("enable.idempotence",ture);//1.設(shè)置冪等
props.put("acks","all");//2.當(dāng)enable.idempotence為true,這里默認(rèn)為all
props.put("max.in.flight.requests.per.connection",5);//3.最大等待數(shù)
props.put("transactional.id","my-transactional-id");//4.設(shè)定事務(wù)id

Producerproducer=newKafkaProducer(props);

//初始化事務(wù)
producer.initTransactions();

try{
//開始事務(wù)
producer.beginTransaction();

//發(fā)送數(shù)據(jù)
producer.send(newProducerRecord("Topic","Key","Value"));

//數(shù)據(jù)發(fā)送及Offset發(fā)送均成功的情況下,提交事務(wù)
producer.commitTransaction();
}catch(ProducerFencedException|OutOfOrderSequenceException|AuthorizationExceptione){
//數(shù)據(jù)發(fā)送或者Offset發(fā)送出現(xiàn)異常時,終止事務(wù)
producer.abortTransaction();
}finally{
//關(guān)閉Producer和Consumer
producer.close();
consumer.close();
}

這里消費(fèi)端 Consumer 需要設(shè)置下配置:isolation.level 參數(shù)

read_uncommitted: 這是默認(rèn)值,表明 Consumer 能夠讀取到 Kafka 寫入的任何消息,不論事務(wù)型 Producer 提交事務(wù)還是終止事務(wù),其寫入的消息都可以讀取。

如果你用了事務(wù)型 Producer,那么對應(yīng)的 Consumer 就不要使用這個值。

read_committed: 表明 Consumer 只會讀取事務(wù)型 Producer 成功提交事務(wù)寫入的消息。當(dāng)然了,它也能看到非事務(wù)型 Producer 寫入的所有消息。

3)消費(fèi)端冪等

“如何解決消息重復(fù)?” 這個問題,其實(shí)換一種說法:就是如何解決消費(fèi)端冪等性問題。

只要消費(fèi)端具備了冪等性,那么重復(fù)消費(fèi)消息的問題也就解決了。

典型的方案是使用:消息表,來去重:

0129106c-a28b-11ed-bfe3-dac502259ad0.jpg

上述栗子中,消費(fèi)端拉取到一條消息后,開啟事務(wù),將消息Id 新增到本地消息表中,同時更新訂單信息。

如果消息重復(fù),則新增操作 insert 會異常,同時觸發(fā)事務(wù)回滾。

二、案例:Kafka 冪等性 Producer 使用

準(zhǔn)備工作如下:

1、Zookeeper:本地使用 Docker 啟動

$dockerrun-d--namezookeeper-p2181:2181zookeeper
a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4

2、Kafka:版本 2.7.1,源碼編譯啟動(看上文源碼搭建啟動)

3、啟動生產(chǎn)者:Kafka 源碼中 exmaple 中

4、啟動消息者:可以用 Kafka 提供的腳本

>基于SpringCloudAlibaba+Gateway+Nacos+RocketMQ+Vue&Element實(shí)現(xiàn)的后臺管理系統(tǒng)+用戶小程序,支持RBAC動態(tài)權(quán)限、多租戶、數(shù)據(jù)權(quán)限、工作流、三方登錄、支付、短信、商城等功能
>
>*項(xiàng)目地址:
>*視頻教程

#舉個栗子:topic 需要自己去修改
$cd./kafka-2.7.1-src/bin
$./kafka-console-producer.sh--broker-listlocalhost:9092--topictest_topic

創(chuàng)建 topic : 1副本,2 分區(qū)

$./kafka-topics.sh--bootstrap-serverlocalhost:9092--topicmyTopic--create--replication-factor1--partitions2

#查看
$./kafka-topics.sh--bootstrap-serverbroker:9092--topicmyTopic--describe

生產(chǎn)者代碼:

013daf5e-a28b-11ed-bfe3-dac502259ad0.jpg

publicclassKafkaProducerApplication{

privatefinalProducerproducer;
finalStringoutTopic;

publicKafkaProducerApplication(finalProducerproducer,
finalStringtopic){
this.producer=producer;
outTopic=topic;
}

publicvoidproduce(finalStringmessage){
finalString[]parts=message.split("-");
finalStringkey,value;
if(parts.length>1){
key=parts[0];
value=parts[1];
}else{
key=null;
value=parts[0];
}
finalProducerRecordproducerRecord
=newProducerRecord<>(outTopic,key,value);
producer.send(producerRecord,
(recordMetadata,e)->{
if(e!=null){
e.printStackTrace();
}else{
System.out.println("key/value"+key+"/"+value+"	writtentotopic[partition]"+recordMetadata.topic()+"["+recordMetadata.partition()+"]atoffset"+recordMetadata.offset());
}
}
);
}

publicvoidshutdown(){
producer.close();
}

publicstaticvoidmain(String[]args){

finalPropertiesprops=newProperties();

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
props.put(ProducerConfig.ACKS_CONFIG,"all");

props.put(ProducerConfig.CLIENT_ID_CONFIG,"myApp");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);

finalStringtopic="myTopic";
finalProducerproducer=newKafkaProducer<>(props);
finalKafkaProducerApplicationproducerApp=newKafkaProducerApplication(producer,topic);

StringfilePath="/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
try{
ListlinesToProduce=Files.readAllLines(Paths.get(filePath));
linesToProduce.stream().filter(l->!l.trim().isEmpty())
.forEach(producerApp::produce);
System.out.println("Offsetsandtimestampscommittedinbatchfrom"+filePath);
}catch(IOExceptione){
System.err.printf("Errorreadingfile%sdueto%s%n",filePath,e);
}finally{
producerApp.shutdown();
}
}
}

啟動生產(chǎn)者后,控制臺輸出如下:

014ce7da-a28b-11ed-bfe3-dac502259ad0.jpg

啟動消費(fèi)者:

$./kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topicmyTopic
015c4680-a28b-11ed-bfe3-dac502259ad0.jpg

修改配置 acks

``

啟用冪等的情況下,調(diào)整 acks 配置,生產(chǎn)者啟動后結(jié)果是怎樣的:

修改配置 acks = 1

修改配置 acks = 0

會直接報錯:

Exceptioninthread"main"org.apache.kafka.common.config.ConfigException:Mustsetackstoallinordertousetheidempotentproducer.
Otherwisewecannotguaranteeidempotence.
0173c8b4-a28b-11ed-bfe3-dac502259ad0.jpg

修改配置 max.in.flight.requests.per.connection

``

啟用冪等的情況下,調(diào)整此配置,結(jié)果是怎樣的:

將 max.in.flight.requests.per.connection > 5 會怎樣?

0182e556-a28b-11ed-bfe3-dac502259ad0.jpg

當(dāng)然會報錯:

Causedby:org.apache.kafka.common.config.ConfigException:Mustsetmax.in.flight.requests.per.connectiontoatmost5tousetheidempotentproducer.
01987f24-a28b-11ed-bfe3-dac502259ad0.jpg






審核編輯:劉清

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • MQTT協(xié)議
    +關(guān)注

    關(guān)注

    0

    文章

    97

    瀏覽量

    5428
  • kafka
    +關(guān)注

    關(guān)注

    0

    文章

    51

    瀏覽量

    5225

原文標(biāo)題:一碰就頭疼的 Kafka 消息重復(fù)問題,立馬解決!

文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏

    評論

    相關(guān)推薦

    在Boost電源中該怎樣選擇電容的型號和電容容量?

    我們之前了解過電容的作用,不外乎儲能、濾波等作用。那么在Boost電源中又該怎樣選擇電容的型號和電容容量?
    發(fā)表于 08-14 15:44 ?3089次閱讀
    在Boost電源中該<b class='flag-5'>怎樣</b><b class='flag-5'>去</b>選擇電容的型號和電容容量<b class='flag-5'>呢</b>?

    kafka數(shù)據(jù)可靠性深度解讀

    At least once,可以保證不丟,但是可能會重復(fù),為了解決重復(fù)需要引入唯一標(biāo)識和重機(jī)制,kafka提供了GUID實(shí)現(xiàn)了唯一標(biāo)識,但是并沒有提供自帶的
    發(fā)表于 05-08 16:29

    基于閃存存儲的Apache Kafka性能提升方法

    據(jù)生態(tài)系統(tǒng)中最常用的分布式消息傳遞系統(tǒng)之一的Apache Kafka進(jìn)行評估,測試如何以最佳方式將美光固態(tài)存儲應(yīng)用于 Apache Kafka,以及將產(chǎn)生怎樣的收益。A
    發(fā)表于 07-24 06:58

    Kafka集群環(huán)境的搭建

    :2181,zk02:2181,zk03:2181注意:broker.id安裝集群服務(wù)個數(shù)編排即可,集群下不能重復(fù)。5、啟動kafka集群# 啟動命令[root@node02 kafka2.11]# bin
    發(fā)表于 01-05 17:55

    怎樣設(shè)置數(shù)值元件的格式

    怎樣設(shè)置數(shù)值元件怎樣設(shè)置數(shù)值元件的格式?
    發(fā)表于 09-26 09:16

    怎樣獲取Android的電池電壓

    怎樣獲取Android的電池電壓?怎樣獲取Android的電池電流?
    發(fā)表于 10-09 08:39

    怎樣使用springboot

    怎樣使用springboot?學(xué)習(xí)springboot需要懂得哪些?
    發(fā)表于 10-25 07:13

    怎樣使用HSE/HSI配置RCC的時鐘

    怎樣使用HSE/HSI配置RCC的時鐘?怎樣設(shè)置系統(tǒng)時鐘的庫函數(shù)
    發(fā)表于 11-10 07:08

    怎樣配置設(shè)備樹的leds節(jié)點(diǎn)

    配置設(shè)備樹leds節(jié)點(diǎn),sys文件系統(tǒng)中沒有出現(xiàn)相應(yīng)設(shè)備文件,引腳沒有查出有重復(fù)定義的?怎樣配置設(shè)備樹的leds節(jié)點(diǎn)
    發(fā)表于 01-07 06:15

    socket通信該怎樣實(shí)現(xiàn)

    socket通信該怎樣實(shí)現(xiàn)?怎樣實(shí)現(xiàn)socket AES-CBC加密?
    發(fā)表于 01-20 07:41

    怎樣配置Android的SDIO部分

    怎樣配置Android的電源部分?怎樣配置Android的SDIO部分
    發(fā)表于 02-10 07:00

    Ubuntu固件的編譯該怎樣使用

    怎樣編譯Ubuntu固件?Ubuntu固件的編譯該怎樣使用?
    發(fā)表于 02-15 06:18

    怎樣寫回調(diào)函數(shù)?怎樣使用回調(diào)函數(shù)

    回調(diào)函數(shù)的作用是什么?單片機(jī)怎么用回調(diào)函數(shù)在不同文件之間傳遞數(shù)據(jù)怎樣寫回調(diào)函數(shù)?怎樣使
    發(fā)表于 02-23 07:40

    Kafka的概念及Kafka的宕機(jī)

    問題要從一次Kafka的宕機(jī)開始說起。 筆者所在的是一家金融科技公司,但公司內(nèi)部并沒有采用在金融支付領(lǐng)域更為流行的 RabbitMQ ,而是采用了設(shè)計之初就為日志處理而生的 Kafka ,所以我一直
    的頭像 發(fā)表于 08-27 11:21 ?2112次閱讀
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕機(jī)

    怎樣減少Confluent Cloud Kafka運(yùn)營成本

    流式數(shù)據(jù)已成為企業(yè)構(gòu)建和運(yùn)營出色數(shù)據(jù)產(chǎn)品的必要條件,而 Apache Kafka 已成為實(shí)時流式傳輸?shù)臉?biāo)準(zhǔn)。
    的頭像 發(fā)表于 09-23 17:23 ?868次閱讀
    主站蜘蛛池模板: 免费观看美女的网站| 久久re视频这里精品09首页| 亚洲视频在线免费观看| 色偷偷888欧美精品久久久| 99爱在线精品视频免费观看9| 亚洲色图19p| 亚洲AV无码乱码A片无码蜜桃| 婷婷综合久久狠狠色| 色婷婷我要去我去也| 日韩人妻无码精品-专区| 九九热在线视频观看这里只有精品| 91偷偷久久做嫩草电影院| 永久精品视频无码一区| 一本道久在线综合色色| 亚洲香蕉视频在线播放| 亚洲一级特黄| 在线观看国产区| 91精品福利一区二区| 91系列在线观看免费| c了瑜伽老师嗷嗷叫一节课视频| 亚洲欧洲日本天天堂在线观看| 亚洲精品AV无码喷奶水糖心| 亚洲妈妈精品一区二区三区| 亚洲中文无码亚洲人在线观看-| 夜月视频直播免费观看| 在线观看免费视频播放视频| 2020精品国产视| www.伊人网| 国产成人精品久久一区二区三区| 99国产精品| 厕所xxxxx| 国产精品人妻系列21P| 国内精品免费久久影院| 精品一品国产午夜福利视频| 久久视频这有精品63在线国产| 末班车动漫无删减免费| 青青草 久久久| 午夜噜噜噜私人影院在线播放| 亚洲色欲国产AV精品综合| 37pao成人国产永久免费视频| xiao776唯美清纯|