引言
本文主要介紹在使用 RocketMQ 時為什么需要重試與兜底機制,生產(chǎn)者與消費者觸發(fā)重試的條件和具體行為,如何在 RocketMQ 中合理使用重試機制,幫助構(gòu)建彈性,高可用系統(tǒng)的最佳實踐。
RocketMQ 的重試機制包括三部分,分別是生產(chǎn)者重試,服務(wù)端內(nèi)部數(shù)據(jù)復(fù)制遇到非預(yù)期問題時重試,消費者消費重試。本文中僅討論生產(chǎn)者重試和消費者消費重試兩種面向用戶側(cè)的實現(xiàn)。
生產(chǎn)者發(fā)送重試
Cloud Native
RocketMQ 的生產(chǎn)者在發(fā)送消息到服務(wù)端時,可能會因為網(wǎng)絡(luò)問題,服務(wù)異常等原因?qū)е抡{(diào)用失敗,這時候應(yīng)該怎么辦?如何盡可能的保證消息不丟失呢?
1. 生產(chǎn)者重試次數(shù)
RocketMQ 在客戶端中內(nèi)置了請求重試邏輯,支持在初始化時配置消息發(fā)送最大重試次數(shù)(默認為 2 次),失敗時會按照設(shè)置的重試次數(shù)重新發(fā)送。直到消息發(fā)送成功,或者達到最大重試次數(shù)時結(jié)束,并在最后一次失敗后返回調(diào)用錯誤的響應(yīng)。對于同步發(fā)送和異步發(fā)送,均支持消息發(fā)送重試。
同步發(fā)送:調(diào)用線程會一直阻塞,直到某次重試成功或最終重試失敗(返回錯誤碼或拋出異常)。
異步發(fā)送:調(diào)用線程不會阻塞,但調(diào)用結(jié)果會通過回調(diào)的形式,以異常事件或者成功事件返回。
2. 生產(chǎn)者重試間隔
在介紹生產(chǎn)者重試前,我們先來了解下流控的概念,流控一般是指服務(wù)端壓力過大,容量不足時服務(wù)端會限制客戶端收發(fā)消息的行為,是服務(wù)端自我保護的一種設(shè)計。RocketMQ 會根據(jù)當(dāng)前是否觸發(fā)了流控而采用不同的重試策略:
非流控錯誤場景:其他觸發(fā)條件觸發(fā)重試后,均會立即進行重試,無等待間隔。流控錯誤場景:系統(tǒng)會按照預(yù)設(shè)的指數(shù)退避策略進行延遲重試。
為什么要引入退避和隨機抖動?
如果故障是由過載流控引起的,重試會增加服務(wù)端負載,導(dǎo)致情況進一步惡化,因此客戶端在遇到流控時會在兩次嘗試之間等待一段時間。每次嘗試后的等待時間都呈指數(shù)級延長。指數(shù)回退可能導(dǎo)致很長的回退時間,因為指數(shù)函數(shù)增長很快。指數(shù)退避算法通過以下參數(shù)控制重試行為,更多信息,請參見 connection-backoff.md。INITIAL_BACKOFF:第一次失敗重試前后需等待多久,默認值:1 秒;
MULTIPLIER :指數(shù)退避因子,即退避倍率,默認值:1.6;
JITTER :隨機抖動因子,默認值:0.2;
MAX_BACKOFF :等待間隔時間上限,默認值:120 秒;
MIN_CONNECT_TIMEOUT :最短重試間隔,默認值:20 秒。
ConnectWithBackoff()
current_backoff = INITIAL_BACKOFF
current_deadline = now() + INITIAL_BACKOFF
while (TryConnect(Max(current_deadline, now() + MIN_CONNECT_TIMEOUT))!= SUCCESS)
SleepUntil(current_deadline)
current_backoff = Min(current_backoff * MULTIPLIER, MAX_BACKOFF)
current_deadline = now() + current_backoff + UniformRandom(-JITTER * current_backoff, JITTER * current_backoff)
特別說明:對于事務(wù)消息,只會進行透明重試(transparent retries),網(wǎng)絡(luò)超時或異常等場景不會進行重試。
3. 重試帶來的副作用
不停的重試看起來很美好,但也是有副作用的,主要包括兩方面:消息重復(fù),服務(wù)端壓力增大
遠程調(diào)用的不確定性,因請求超時觸發(fā)消息發(fā)送重試流程,此時客戶端無法感知服務(wù)端的處理結(jié)果;客戶端進行的消息發(fā)送重試可能會導(dǎo)致消費方重復(fù)消費,應(yīng)該按照用戶ID、業(yè)務(wù)主鍵等信息冪等處理消息。
較多的重試次數(shù)也會增大服務(wù)端的處理壓力。
4. 用戶的最佳實踐是什么
1)合理設(shè)置發(fā)送超時時間,發(fā)送的最大次數(shù)
發(fā)送的最大次數(shù)在初始化客戶端時配置在 ClientConfiguration;對于某些實時調(diào)用類場景,可能會導(dǎo)致消息發(fā)送請求鏈路被阻塞導(dǎo)致業(yè)務(wù)請求整體耗時高或耗時;需要合理評估每次調(diào)用請求的超時時間以及最大重試次數(shù),避免影響全鏈路的耗時。2)如何保證發(fā)送消息不丟失由于分布式環(huán)境的復(fù)雜性,例如網(wǎng)絡(luò)不可達時 RocketMQ 客戶端發(fā)送請求重試機制并不能保證消息發(fā)送一定成功。業(yè)務(wù)方需要捕獲異常,并做好冗余保護處理,常見的解決方案有兩種:
向調(diào)用方返回業(yè)務(wù)處理失敗;
嘗試將失敗的消息存儲到數(shù)據(jù)庫,然后由后臺線程定時重試,保證業(yè)務(wù)邏輯的最終一致性。
3)關(guān)注流控異常導(dǎo)致無法重試觸發(fā)流控的根本原因是系統(tǒng)容量不足,如果因為突發(fā)原因觸發(fā)消息流控,且客戶端內(nèi)置的重試流程執(zhí)行失敗,
則建議執(zhí)行服務(wù)端擴容,將請求調(diào)用臨時替換到其他系統(tǒng)進行應(yīng)急處理。4)早期版本客戶端如何使用故障延遲機制進行發(fā)送重試?對于 RocketMQ 4.x 和 3.x 以下客戶端開啟故障延遲機制可以用:
producer.setSendLatencyFaultEnable(true)
配置重試次數(shù)使用:
producer.setRetryTimesWhenSendFailed()producer.setRetryTimesWhenSendAsyncFailed()
消費者消費重試
Cloud Native
消息中間件做異步解耦時的一個典型問題是如果下游服務(wù)處理消息事件失敗,那應(yīng)該怎么做呢?RocketMQ 的消息確認機制以及消費重試策略可以幫助分析如下問題:
如何保證業(yè)務(wù)完整處理消息?
消費重試策略可以在設(shè)計實現(xiàn)消費者邏輯時保證每條消息處理的完整性,避免部分消息消費異常導(dǎo)致業(yè)務(wù)狀態(tài)不一致。
業(yè)務(wù)應(yīng)用異常時處理中的消息狀態(tài)如何恢復(fù)?
當(dāng)系統(tǒng)出現(xiàn)異常(宕機故障)等場景時,處理中的消息狀態(tài)如何恢復(fù),消費重試具體行為是什么。
1. 什么是消費重試?
什么時候認為消費失敗?
消費者在接收到消息后將調(diào)用用戶的消費函數(shù)執(zhí)行業(yè)務(wù)邏輯。如果客戶端返回消費失敗 ReconsumeLater,拋出非預(yù)期異常,或消息處理超時(包括在 PushConsumer 中排隊超時),只要服務(wù)端服務(wù)端一定時間內(nèi)沒收到響應(yīng),將認為消費失敗。
消費重試是什么?
消費者在消費某條消息失敗后,服務(wù)端會根據(jù)重試策略重新向客戶端投遞該消息。超過一次定數(shù)后若還未消費成功,則該消息將不再繼續(xù)重試,直接被發(fā)送到死信隊列中;
重試過程狀態(tài)機:消息在重試流程中的狀態(tài)和變化邏輯;
重試間隔:上一次消費失敗或超時后,下次重新嘗試消費的間隔時間;
最大重試次數(shù):消息可被重試消費的最大次數(shù)。
2. 消息重試的場景
需要注意重試是應(yīng)對異常情況,給予程序再次消費失敗消息的機會,不應(yīng)該被用作常態(tài)化的鏈路。
推薦使用場景:
業(yè)務(wù)處理失敗,失敗原因跟當(dāng)前的消息內(nèi)容相關(guān),預(yù)期一段時間后可執(zhí)行成功;
是一個小概率事件,對于大批的消息只有很少量的失敗,后面的消息大概率會消費成功,是非常態(tài)化的。
正例:消費邏輯是扣減庫存,極少量商品因為樂觀鎖版本沖突導(dǎo)致扣減失敗,重試一般立刻成功。錯誤使用場景:
消費處理邏輯中使用消費失敗來做條件判斷的結(jié)果分流,是不合理的。
反例:訂單在數(shù)據(jù)庫中狀態(tài)已經(jīng)是已取消,此時如果收到發(fā)貨的消息,處理時不應(yīng)返回消費失敗,而應(yīng)該返回成功并標(biāo)記不用發(fā)貨。
消費處理中使用消費失敗來做處理速率限流,是不合理的。
限流的目的是將超出流量的消息暫時堆積在隊列中達到削峰的作用,而不是讓消息進入重試鏈路。
這種做法會讓消息反復(fù)在服務(wù)端和客戶端之間傳遞,增大了系統(tǒng)的開銷,主要包括以下方面:
RocketMQ 內(nèi)部重試涉及寫放大,每一次重試將生成新的重試消息,大量重試將帶來嚴重的 IO 壓力;
重試有復(fù)雜的退避邏輯,內(nèi)部實現(xiàn)為梯度定時器,該定時器本身不具備高吞吐的特性,大量重試將導(dǎo)致重試消息無法及時出隊。重試的間隔將不穩(wěn)定,將導(dǎo)致大量重試消息延后消費,即削峰的周期被大幅度延長。
3. 不要以重試替代限流
上述誤用的場景實際上是組合了限流和重試能力來進行削峰,RocketMQ 推薦的削峰最佳手段為組合限流和堆積,業(yè)務(wù)以保護自身為前提,需要對消費流量進行限流,并利用 RocketMQ 提供的堆積能力將超出業(yè)務(wù)當(dāng)前處理的消息滯后消費,以達到削峰的目的。下圖中超過處理能力的消息都應(yīng)該被堆積在服務(wù)端,而不是通過消費失敗進行重試。
如果不想依賴額外的產(chǎn)品/組件來完成該功能,也可以利用一些本地工具類,比如 Guava 的 RateLimiter 來完成單機限流。如下所示,聲明一個 50 QPS 的 RateLimiter,在消費前以阻塞的方式 acquire 一個令牌,獲取到即處理消息,未獲取到阻塞。
RateLimiter rateLimiter = RateLimiter.create(50); PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) // 設(shè)置訂閱組名稱 .setConsumerGroup(consumerGroup) // 設(shè)置訂閱的過濾器 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView -> { // 阻塞直到獲得一個令牌,也可以配置一個超時時間 rateLimiter.acquire(); LOGGER.info("Consume message={}", messageView); return ConsumeResult.SUCCESS; }) .build();
4. PushConsumer 消費重試策略
PushConsumer 消費消息時,消息的幾個主要狀態(tài)如下:
Ready:已就緒狀態(tài)。消息在消息隊列RocketMQ版服務(wù)端已就緒,可以被消費者消費;
Inflight:處理中狀態(tài)。消息被消費者客戶端獲取,處于消費中還未返回消費結(jié)果的狀態(tài);
Commit:提交狀態(tài)。消費成功的狀態(tài),消費者返回成功響應(yīng)即可結(jié)束消息的狀態(tài)機;
DLQ:死信狀態(tài)
消費邏輯的最終兜底機制,若消息一直處理失敗并不斷進行重試,直到超過最大重試次數(shù)還未成功,此時消息不會再重試。
該消息會被投遞至死信隊列。您可以通過消費死信隊列的消息進行業(yè)務(wù)恢復(fù)。
最大重試次數(shù)
PushConsumer 的最大重試次數(shù)由創(chuàng)建時決定。例如,最大重試次數(shù)為 3 次,則該消息最多可被投遞 4 次,1 次為原始消息,3 次為重試投遞次數(shù)。
重試間隔時間
無序消息(非順序消息):重試間隔為階梯時間,具體時間如下:
說明:若重試次數(shù)超過 16 次,后面每次重試間隔都為 2 小時。
順序消息:重試間隔為固定時間,默認為 3 秒。
5. SimpleConsumer 消費重試策略
和 PushConsumer 消費重試策略不同,SimpleConsumer 消費者的重試間隔是預(yù)分配的,每次獲取消息消費者會在調(diào)用 API 時設(shè)置一個不可見時間參數(shù) InvisibleDuration,即消息的最大處理時長。若消息消費失敗觸發(fā)重試,不需要設(shè)置下一次重試的時間間隔,直接復(fù)用不可見時間參數(shù)的取值。
由于不可見時間為預(yù)分配的,可能和實際業(yè)務(wù)中的消息處理時間差別較大,可以通過 API 接口修改不可見時間。例如,預(yù)設(shè)消息處理耗時最多 20 ms,但實際業(yè)務(wù)中 20 ms內(nèi)消息處理不完,可以修改消息不可見時間,延長消息處理時間,避免消息觸發(fā)重試機制。修改消息不可見時間需要滿足以下條件:
消息處理未超時
消息處理未提交消費狀態(tài)
如下圖所示,消息不可見時間修改后立即生效,即從調(diào)用 API 時刻開始,重新計算消息不可見時間。
最大重試次數(shù)
與 PushConsumer 相同。
消息重試間隔
消息重試間隔 = 不可見時間 - 消息實際處理時長例如:消息不可見時間為 30 ms,實際消息處理用了 10 ms 就返回失敗響應(yīng),則距下次消息重試還需要 20 ms,此時的消息重試間隔即為 20 ms;若直到 30 ms 消息還未處理完成且未返回結(jié)果,則消息超時,立即重試,此時重試間隔即為 0 ms。SimpleConsumer 的消費重試間隔通過消息的不可見時間控制。
//消費示例:使用SimpleConsumer消費普通消息,主動獲取消息處理并提交。
ClientServiceProvider provider1 = ClientServiceProvider.loadService();
String topic1 = "Your Topic";
FilterExpression filterExpression1 = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider1.newSimpleConsumerBuilder()
//設(shè)置消費者分組。
.setConsumerGroup("Your ConsumerGroup")
//設(shè)置接入點。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
//設(shè)置預(yù)綁定的訂閱關(guān)系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
List messageViewList = null;
try {
//SimpleConsumer需要主動獲取消息,并處理。
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消費處理完成后,需要主動調(diào)用ACK提交消費結(jié)果。
//沒有ack會被認為消費失敗
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系統(tǒng)流控等原因造成拉取失敗,需要重新發(fā)起獲取消息請求。
e.printStackTrace();
}
修改消息的不可見時間案例:某產(chǎn)品使用消息隊列來發(fā)送解耦“視頻渲染”的業(yè)務(wù)邏輯,發(fā)送方發(fā)送任務(wù)編號,消費方收到編號后處理任務(wù)。由于消費方的業(yè)務(wù)邏輯耗時較長,消費者重新消費到同一個任務(wù)時,該任務(wù)未完成,只能返回消費失敗。在這種全新的 API 下,用戶可以調(diào)用可以通過修改不可見時間給消息續(xù)期,實現(xiàn)對單條消息狀態(tài)的精確控制。
simpleConsumer.changeInvisibleDuration();
simpleConsumer.changeInvisibleDurationAsync();
6. 功能約束與最佳實踐
設(shè)置消費的最大超時時間和次數(shù)
盡快明確的向服務(wù)端返回成功或失敗,不要以超時(有時是異常拋出)代替消費失敗。
不要用重試機制來進行業(yè)務(wù)限流
錯誤示例:如果當(dāng)前消費速度過高觸發(fā)限流,則返回消費失敗,等待下次重新消費。正確示例:如果當(dāng)前消費速度過高觸發(fā)限流,則延遲獲取消息,稍后再消費。
發(fā)送重試和消費重試會導(dǎo)致相同的消息重復(fù)消費,消費方應(yīng)該有一個良好的冪等設(shè)計
正確示例:某系統(tǒng)中消費的邏輯是為某個用戶發(fā)送短信,該短信已經(jīng)發(fā)送成功了,當(dāng)消費者應(yīng)用重復(fù)收到該消息,此時應(yīng)該返回消費成功。
總結(jié)
Cloud Native
本文主要介紹重試的基本概念,生產(chǎn)者消費者收發(fā)消息時觸發(fā)重試的條件和具體行為,以及 RocketMQ 收發(fā)容錯的最佳實踐。重試策略幫助我們從隨機的、短暫的瞬態(tài)故障中恢復(fù),是在容忍錯誤時,提高可用性的一種強大機制。但請謹記 “重試是對于分布式系統(tǒng)來說自私的”,因為客戶端認為其請求很重要,并要求服務(wù)端花費更多資源來處理,盲目的重試設(shè)計不可取,合理的使用重試可以幫助我們構(gòu)建更加彈性且可靠的系統(tǒng)。
審核編輯:郭婷
-
線程
+關(guān)注
關(guān)注
0文章
505瀏覽量
19705
原文標(biāo)題:RocketMQ重試機制詳解及最佳實踐
文章出處:【微信號:OSC開源社區(qū),微信公眾號:OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論