為了系統間解耦,我們通常會引入MQ框架,大家各司其職共同完成上下游的業務流程。
大致過程:
生產端,創建一條消息,通過網絡發送到MQ Server
MQ將 消息存儲在topic 的一個分區里
消費端,從分區中拉取消息,消費處理
但現實往往不一樣!MQ 架構設計要滿足高并發、高性能、高可用等指標
單分區,達不到我們的吞吐量要求,我們考慮采用多分區架構設計,正所謂 ”三個臭皮匠賽過一個諸葛亮“,多分區可以有效分攤全局壓力,提升整體系統性能。
兩臺 MQ機器,組成一個集群,原先一個分區存儲6條消息,現在分攤到兩個分區,每個分區各存儲3條消息,性能比上面那個提升一倍。
貌似可以滿足我們的需求,但任何事情都有兩面性!
我們看看下面業務場景:
一個用戶在電商網站上下訂單到交易完成,中間會經歷一系列動作,訂單的狀態也會隨之變化,一個訂單會產生多條MQ消息,下單、付款、發貨、買家確認收貨,消費端需要嚴格按照業務狀態機的順序處理,否則,就會出現業務問題。
我們發現,消息帶上了狀態,不再是一個個獨立的個體,有了上下文依賴關系!
對于這個問題,突然想到HTTP協議,其本身也是無狀態的,也就是說前后兩次請求沒有關聯,但有些業務功能有登錄要求,那怎么解決?
引入Cookie機制,每次請求客戶端額外傳輸一些數據,來達到上下文關聯。
回到MQ的消息順序問題,我們要如何解決?
答案:各退一步,保證局部有序。
比如上面的電商例子,只要保證一個訂單的多條狀態消息在同一個分區,便可以滿足業務需求,這個方案可以覆蓋大部分的業務場景。
這里面只需要有一個路由策略組件,由它決定消息該放到哪個分區中!
考慮到市面MQ開源框架很多,常見的如:Kafka、Pulsar、RabbitMQ、RocketMQ 等,API方法略有區別,但設計思路是相通的。
接下來,我們以 RocketMQ 為例:
生產端提供了一個接口 MessageQueueSelector
public interface MessageQueueSelector {
MessageQueue select(final List《MessageQueue》 mqs, final Message msg, final Object arg);
}
接口內定義一個select方法,具體參數含義:
mqs:該Topic下所有的隊列分片
msg:待發送的消息
arg:發送消息時傳遞的參數
關于MessageQueueSelector接口,RocketMQ 框架提供了三個默認實現類:
1、SelectMessageQueueByHash:
arg參數的hashcode的絕對值,然后對mqs.size()取余,得到目標隊列在mqs的下標
2、SelectMessageQueueByRandom:
對mqs.size()值取隨機數作為目標隊列在mqs的下標
3、SelectMessageQueueByMachineRoom
返回null
特別注意:
雖然保證了單個分片的消息有序,但每個分片的消費者只能是單線程處理,因為多線程無法控制消費順序。這個可能會損失一些性能。
這里又引出另一個問題,如何保證一個隊列只能有一個消費端呢?
1、
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
遍歷一個topic下所有的MessageQueue
isOrder && !this.lock(mq) 嘗試對它加鎖,確保一個MessageQueue只能被一個消費者處理
2、將PullRequest對象放入PullMessageService的pullRequestQueue隊列中
public void dispatchPullRequest(List《PullRequest》 pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info(“doRebalance, {}, add a new pull request {}”, consumerGroup, pullRequest);
}
}
3、org.apache.rocketmq.client.impl.consumer.PullMessageService#run
PullMessageService 是一個Runnable線程任務
無限循環,從隊列中拉取、處理消息
另一個問題,如何保證一個隊列,只有一個線程在處理消息呢?
1、 DefaultMQPushConsumerImpl#pullMessage
ConsumeMessageService 中有兩個實現類,因為我們有消費順序要求,會選擇ConsumeMessageOrderlyService來處理業務
2、 ConsumeMessageOrderlyService.ConsumeRequest
從ConcurrentMap中獲取messageQueue對應的鎖對象
通過 synchronized 關鍵字,線程來搶占鎖,互斥關系,從而保證了一個MessageQueue只能有一個線程并發處理
繼續往下看,如果擴容了怎么辦?
原來有6個分區,order_id_1的消息在MessageQueue6 中,此時擴容一倍,現在12個分區,order_id_1訂單后面產生的消息可能路由到了MessageQueue8 中,同一個訂單的消息分布在兩個分區中,無法保證順序。
我們能做的是,先將存量消息處理完,再擴容。如果是在線業務,可以搞個臨時topic,先將消息暫時堆積,待擴容后,按新的路由規則重新發送。
順序消息,如果某條失敗了怎么辦?會不會一直阻塞?
1、如果失敗,不會提交消費位移,系統會自動重試(有重試上限),此時會阻塞后面的消息消費,直到這條消息處理完
2、如果這個消息達到重試上限,依然失敗,會進入死信隊列,可以繼續處理后面的消息
責任編輯:haq
-
數據
+關注
關注
8文章
7002瀏覽量
88943 -
框架
+關注
關注
0文章
403瀏覽量
17475
原文標題:面試官問: 如何保證 MQ 消息是有序的?
文章出處:【微信號:DBDevs,微信公眾號:數據分析與開發】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論