消息隊列使用場景 為什么會需要消息隊列(MQ)?
解耦
在項目啟動之初來預測將來項目會碰到什么需求,是極其困難的。消息系統在處理過程中間插入了一個隱含的、基于數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
冗余
有些情況下,處理數據的過程會失敗。除非數據被持久化,否則將造成丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的”插入-獲取-刪除”范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
擴展性
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴展就像調大電力按鈕一樣簡單。
靈活性 & 峰值處理能力
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見;如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
可恢復性
系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
順序保證
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數據會按照特定的順序來處理。Kafka保證一個Partition內的消息的有序性。
緩沖
在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列通過一個緩沖層來幫助任務最高效率的執行———寫入隊列的處理會盡可能的快速。該緩沖有助于控制和優化數據流經過系統的速度。
異步通信
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
MQ常用的使用場景:
1. 進程間通訊和系統間的消息通知,比如在分布式系統中。
2. 解耦,比如像我們公司有許多開發團隊,每個團隊負責業務的不同模塊,各個開發團隊可以使用MQ來通信。
3. 在一些高并發場景下,使用MQ的異步特性。
消息隊列和RPC對比 系統架構
RPC系統結構:
+----------++----------+|Consumer|<=>|Provider|+----------++----------+
Consumer調用的Provider提供的服務。
Message Queue系統結構:
+--------++-------++----------+|Sender|<=>|Queue|<=>|Receiver|+--------++-------++----------+
Sender發送消息給Queue;Receiver從Queue拿到消息來處理。
功能特點
在架構上,RPC和Message Queue的差異點是,Message Queue有一個中間結點Message Queue(broker),可以把消息存儲。
消息隊列的特點
Message Queue把請求的壓力保存一下,逐漸釋放出來,讓處理者按照自己的節奏來處理。
Message Queue引入一下新的結點,讓系統的可靠性會受Message Queue結點的影響。
Message Queue是異步單向的消息。發送消息設計成是不需要等待消息處理的完成。
所以對于有同步返回需求,用Message Queue則變得麻煩了。
RPC的特點
同步調用,對于要等待返回結果/處理結果的場景,RPC是可以非常自然直覺的使用方式。RPC也可以是異步調用。
由于等待結果,Consumer(Client)會有線程消耗。
如果以異步RPC的方式使用,Consumer(Client)線程消耗可以去掉。但不能做到像消息一樣暫存消息/請求,壓力會直接傳導到服務Provider。
RPC適用場合說明
希望同步得到結果的場合,RPC合適。
希望使用簡單,則RPC;RPC操作基于接口,使用簡單,使用方式模擬本地調用。異步的方式編程比較復雜。
不希望發送端(RPC Consumer、Message Sender)受限于處理端(RPC Provider、Message Receiver)的速度時,使用Message Queue。
隨著業務增長,有的處理端處理量會成為瓶頸,會進行同步調用到異步消息的改造。這樣的改造實際上有調整業務的使用方式。
比如原來一個操作頁面提交后就下一個頁面會看到處理結果;改造后異步消息后,下一個頁面就會變成“操作已提交,完成后會得到通知”。
RPC不適用場合說明
RPC同步調用使用Message Queue來傳輸調用信息。 上面分析可以知道,這樣的做法,發送端是在等待,同時占用一個中間點的資源。變得復雜了,但沒有對等的收益。
對于返回值是void的調用,可以這樣做,因為實際上這個調用業務上往往不需要同步得到處理結果的,只要保證會處理即可。(RPC的方式可以保證調用返回即處理完成,使用消息方式后這一點不能保證了。)
返回值是void的調用,使用消息,效果上是把消息的使用方式Wrap成了服務調用(服務調用使用方式成簡單,基于業務接口)。
常用的消息隊列及使用場景 ActiveMQ
AcitveMQ是作為一種消息存儲和分發組件,涉及到client與broker端數據交互的方方面面,它不僅要擔保消息的存儲安全性,還要提供額外的手段來確保消息的分發是可靠的。
ActiveMQ消息傳送機制
Producer客戶端使用來發送消息的, Consumer客戶端用來消費消息;它們的協同中心就是ActiveMQ broker,broker也是讓producer和consumer調用過程解耦的工具,最終實現了異步RPC/數據交換的功能。隨著ActiveMQ的不斷發展,支持了越來越多的特性,也解決開發者在各種場景下使用ActiveMQ的需求。比如producer支持異步調用;使用flow control機制讓broker協同consumer的消費速率;consumer端可以使用prefetchACK來最大化消息消費的速率;提供”重發策略”等來提高消息的安全性等。在此我們不詳細介紹。
一條消息的生命周期如下:
?
圖片中簡單的描述了一條消息的生命周期,不過在不同的架構環境中,message的流動行可能更加復雜.將在稍后有關broker的架構中詳解..一條消息從producer端發出之后,一旦被broker正確保存,那么它將會被consumer消費,然后ACK,broker端才會刪除;不過當消息過期或者存儲設備溢出時,也會終結它。
ActiveMQ的安裝
啟動后,activeMQ會占用兩個端口,一個是負責接收發送消息的tcp端口:61616,一個是基于web負責用戶界面化管理的端口:8161。這兩個端口可以在conf下面的xml中找到。http服務器使用了jettry。這里有個問題是啟動mq后,很長時間管理界面才可以顯示出來。可以使用netstat -an|find “61616”來測試ActiveMQ是否啟動。
Jms與ActiveMQ的結合
JMS是一個用于提供消息服務的技術規范,它制定了在整個消息服務提供過程中的所有數據結構和交互流程。而MQ則是消息隊列服務,是面向消息中間件(MOM)的最終實現,是真正的服務提供者;MQ的實現可以基于JMS,也可以基于其他規范或標準。目前選擇的最多的是ActiveMQ。
JMS支持兩種消息傳遞模型:點對點(point-to-point,簡稱PTP)和發布/訂閱(publish/subscribe,簡稱pub/sub)。這兩種消息傳遞模型非常相似,但有以下區別:
PTP消息傳遞模型規定了一條消息之恩能夠傳遞費一個接收方。
Pub/sub消息傳遞模型允許一條消息傳遞給多個接收方
點對點模型
通過點對點的消息傳遞模型,一個應用程序可以向另外一個應用程序發送消息。在此傳遞模型中,目標類型是隊列。消息首先被傳送至隊列目標,然后從該隊列將消息傳送至對此隊列進行監聽的某個消費者,如下圖:
?
一個隊列可以關聯多個隊列發送方和接收方,但一條消息僅傳遞給一個接收方。如果多個接收方正在監聽隊列上的消息,JMS Provider將根據“先來者優先”的原則確定由哪個價售房接受下一條消息。如果沒有接收方在監聽隊列,消息將保留在隊列中,直至接收方連接到隊列為止。這種消息傳遞模型是傳統意義上的拉模型或輪詢模型。在此列模型中,消息不時自動推動給客戶端的,而是要由客戶端從隊列中請求獲得。
點對點模型的代碼(springboot+jms+activemq)實現如下:
@Service("queueproducer")publicclassQueueProducer{@Autowired// 也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝privateJmsMessagingTemplate jmsMessagingTemplate;// 發送消息,destination是發送到的隊列,message是待發送的消息@Scheduled(fixedDelay=3000)//每3s執行1次publicvoidsendMessage(Destination destination,finalString message){ jmsMessagingTemplate.convertAndSend(destination, message); }@JmsListener(destination="out.queue")publicvoidconsumerMessage(String text){ System.out.println("從out.queue隊列收到的回復報文為:"+text); } }
Producer的實現
@ComponentpublicclassQueueConsumer2{// 使用JmsListener配置消費者監聽的隊列,其中text是接收到的消息@JmsListener(destination ="mytest.queue")//SendTo 該注解的意思是將return回的值,再發送的"out.queue"隊列中@SendTo("out.queue")publicStringreceiveQueue(String text) { System.out.println("QueueConsumer2收到的報文為:"+text);return"return message "+text; } }
Consumer的實現
@RunWith(SpringRunner.class)@SpringBootTestpublicclassActivemqQueueTests{@AutowiredprivateQueueProducer producer;@TestpublicvoidcontextLoads()throwsInterruptedException { Destination destination =newActiveMQQueue("mytest.queue");for(inti=0; i<10; i++){ producer.sendMessage(destination,"myname is Flytiger"+ i); } } }
Test的實現
其中QueueConsumer2表明的是一個雙向隊列。
發布/訂閱模型
通過發布/訂閱消息傳遞模型,應用程序能夠將一條消息發送到多個接收方。在此傳送模型中,目標類型是主題。消息首先被傳送至主題目標,然后傳送至所有已訂閱此主題的或送消費者。如下圖:
?
主題目標也支持長期訂閱。長期訂閱表示消費者已注冊了主題目標,但在消息到達目標時該消費者可以處于非活動狀態。當消費者再次處于活動狀態時,將會接收該消息。如果消費者均沒有注冊某個主題目標,該主題只保留注冊了長期訂閱的非活動消費者的消息。與PTP消息傳遞模型不同,pub/sub消息傳遞模型允許多個主題訂閱者接收同一條消息。JMS一直保留消息,直至所有主題訂閱者都接收到消息為止。pub/sub消息傳遞模型基本上是一個推模型。在該模型中,消息會自動廣播,消費者無須通過主動請求或輪詢主題的方法來獲得新的消息。
上面兩種消息傳遞模型里,我們都需要定義消息生產者和消費者,生產者把消息發送到JMS Provider的某個目標地址(Destination),消息從該目標地址傳送至消費者。消費者可以同步或異步接收消息,一般而言,異步消息消費者的執行和伸縮性都優于同步消息接收者,體現在:
1. 異步消息接收者創建的網絡流量比較小。單向對東消息,并使之通過管道進入消息監聽器。管道操作支持將多條消息聚合為一個網絡調用。
2. 異步消息接收者使用線程比較少。異步消息接收者在不活動期間不使用線程。同步消息接收者在接收調用期間內使用線程,結果線程可能會長時間保持空閑,尤其是如果該調用中指定了阻塞超時。
3. 對于服務器上運行的應用程序代碼,使用異步消息接收者幾乎總是最佳選擇,尤其是通過消息驅動Bean。使用異步消息接收者可以防止應用程序代碼在服務器上執行阻塞操作。而阻塞操作會是服務器端線程空閑,甚至會導致死鎖。阻塞操作使用所有線程時則發生死鎖。如果沒有空余的線程可以處理阻塞操作自身解鎖所需的操作,這該操作永遠無法停止阻塞。
發布/訂閱模型的代碼(springboot+jms+activemq)實現如下:
@Service("topicproducer")publicclassTopicProducer{@Autowired// 也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝privateJmsMessagingTemplate jmsMessagingTemplate;// 發送消息,destination是發送到的隊列,message是待發送的消息@Scheduled(fixedDelay=3000)//每3s執行1次publicvoidsendMessage(Destination destination,finalString message){ jmsMessagingTemplate.convertAndSend(destination, message); } }
Producer的實現
@ComponentpublicclassTopicConsumer2 {// 使用JmsListener配置消費者監聽的隊列,其中text是接收到的消息@JmsListener(destination ="mytest.topic")publicvoidreceiveTopic(String text) { System.out.println("TopicConsumer2收到的topic報文為:"+text); } }
Consumer的實現
@RunWith(SpringRunner.class)@SpringBootTestpublicclassActivemqTopicTests{@AutowiredprivateTopicProducer producer;@TestpublicvoidcontextLoads()throwsInterruptedException { Destination destination =newActiveMQTopic("mytest.topic");for(inti=0; i<3; i++){ producer.sendMessage(destination,"myname is TopicFlytiger"+ i); } } }
Test的實現
Topic模式工作時,默認只能發送和接收queue消息,如果要發送和接收topic消息,需要加入:
spring.jms.pub-sub-domain=true Queue與Topic的比較
JMS Queue執行load balancer語義
一條消息僅能被一個consumer收到。如果在message發送的時候沒有可用的consumer,那么它講被保存一直到能處理該message的consumer可用。如果一個consumer收到一條message后卻不響應它,那么這條消息將被轉到另外一個consumer那兒。一個Queue可以有很多consumer,并且在多個可用的consumer中負載均衡。
Topic實現publish和subscribe語義
一條消息被publish時,他將發送給所有感興趣的訂閱者,所以零到多個subscriber將接收到消息的一個拷貝。但是在消息代理接收到消息時,只有激活訂閱的subscriber能夠獲得消息的一個拷貝。
分別對應兩種消息模式
Point-to-Point(點對點),Publisher/Subscriber Model(發布/訂閱者)
其中在Publicher/Subscriber模式下又有Nondurable subscription(非持久化訂閱)和durable subscription(持久化訂閱)兩種消息處理方式。
ActiveMQ優缺點
優點:是一個快速的開源消息組件(框架),支持集群,同等網絡,自動檢測,TCP,SSL,廣播,持久化,XA,和J2EE1.4容器無縫結合,并且支持輕量級容器和大多數跨語言客戶端上的Java虛擬機。消息異步接受,減少軟件多系統集成的耦合度。消息可靠接收,確保消息在中間件可靠保存,多個消息也可以組成原子事務。
缺點:ActiveMQ默認的配置性能偏低,需要優化配置,但是配置文件復雜,ActiveMQ本身不提供管理工具;示例代碼少;主頁上的文檔看上去比較全面,但是缺乏一種有效的組織方式,文檔只有片段,用戶很難由淺入深進行了解,二、文檔整體的專業性太強。在研究階段可以通過查maillist、看Javadoc、分析源代碼來了解。
RabbitMQ 簡介
Rabbitmq簡介可以參考我的兩篇文章:
openstack的RPC機制之AMQP協議()
RabbitMQ高可用性()
RabbitMQ安裝好之后的默認賬號密碼是(guest/guest)
需要注意的是:
多個消費者可以訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息并處理。這種分發方式叫做round-robin(循環的方式)。
當publisher將消息發給queue的過程中,publisher會指明routing key。Direct模式中,Direct Exchange 根據 Routing Key 進行精確匹配,只有對應的 Message Queue 會接受到消息。Topic模式中Exchange會根據routing key和bindkey進行模式匹配,決定將消息發送到哪個queue中。
有一個疑問:當有多個consumer時,rabbitmq會平均分攤給這些consumer;沒辦法把同一個message發給不同的consumer嗎?
我之前的猜想是,當有多個consumer使用topic模式訂閱消息時,所有的消息它們都會收到;但如果是direct模式,只有一個consumer會收到消息。(理解錯誤,topic和direct只是publisher用來選擇發到不同的queue,不是consumer接收消息。一個隊列一個消息只能發送給一個消費者,不然消費者的ack也會有很多,RabbitMQ Server也不好處理)
RabbitMQ的消息確認
默認情況下,如果Message 已經被某個Consumer正確的接收到了,那么該Message就會被從queue中移除。當然也可以讓同一個Message發送到很多的Consumer。
如果一個queue沒被任何的Consumer Subscribe(訂閱),那么,如果這個queue有數據到達,那么這個數據會被cache,不會被丟棄。當有Consumer時,這個數據會被立即發送到這個Consumer,這個數據被Consumer正確收到時,這個數據就被從queue中刪除。
那么什么是正確收到呢?通過ack。每個Message都要被acknowledged(確認,ack)。我們可以顯示的在程序中去ack,也可以自動的ack。如果有數據沒有被ack,那么:
RabbitMQ Server會把這個信息發送到下一個Consumer。而且ack的機制可以起到限流的作用(Benefitto throttling):在Consumer處理完成數據后發送ack,甚至在額外的延時后發送ack,將有效的balance Consumer的load。
RabbitMQ功能測試
本次測試依然是RabbitMQ+springboot,首先需要application.properties
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672spring.rabbitmq.username=guest spring.rabbitmq.password=guest
這里的端口是5672,,15672時管理端的端口。
pom要添加依賴:
評論
查看更多