RabbitMQ 是一個流行的開源消息隊列軟件,它提供了多種通信模型,例如發布/訂閱模型、路由模型、work模型等。在前面的文章中我們已經介紹了前四種模型,本文將會學習 RabbitMQ 中的 Topic 模型;接下來還會有關于RabbitMQ的系列教程,對你有幫助的話記得關注哦~
Topic 模型
Topic 模型是 RabbitMQ 的高級模型之一,Topic 模型使用了通配符的概念,可以匹配更靈活的路由規則。topic模式相當于是對路由模式的一個升級,topic模式主要就是在匹配的規則上可以實現模糊匹配。
在 Topic 模型中,生產者將消息發送到交換機,交換機根據消息的 routing key 將消息轉發到對應的隊列中。與 Direct 模型不同的是,Topic 模型中 routing key 支持通配符匹配,其中 ' ' 可以匹配一個單詞,'#' 可以匹配多個單詞。例如,"order. " 可以匹配 "order.create","order.delete" 等消息,而 "order.#" 可以匹配 "order.create.one","order.delete.two" 等消息。
適用場景
Topic 模型適用于需要靈活的消息路由規則的場景,例如:
- 新聞網站訂閱分類消息;
- 電商網站訂閱商品分類消息;
- 金融機構訂閱股票市場消息等。
演示
生產者
// 生產者 public class Producer { private static final String EXCHANGE_NAME = "exchange_topic_1"; private static final String EXCHANGE_ROUTING_KEY1 = "topic.km"; private static final String EXCHANGE_ROUTING_KEY2 = "topic.km.001"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); for (int i = 0; i < 100; i++) { // topic在路由模型的基礎上,只有路由的key發生改變,其余的都不變 if (i % 2 == 0) { channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1, MessageProperties.PERSISTENT_TEXT_PLAIN, ("topic模型發送的第 " + i + " 條信息").getBytes()); } else { channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2, MessageProperties.PERSISTENT_TEXT_PLAIN, ("topic模型發送的第 " + i + " 條信息").getBytes()); } } channel.close(); connection.close(); } }
消費者
// 消費者1 public class Consumer1 { private static final String QUEUE_NAME = "queue_topic_1"; private static final String EXCHANGE_NAME = "exchange_topic_1"; private static final String EXCHANGE_ROUTING_KEY = "topic.*"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者1接收到的消息是:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
// 消費者2 public class Consumer2 { private static final String QUEUE_NAME = "queue_topic_2"; private static final String EXCHANGE_NAME = "exchange_topic_1"; private static final String EXCHANGE_ROUTING_KEY = "topic.#"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者2接收到的消息是:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }
測試
先啟動2個消費者,再啟動生產者
消費者1訂閱的是 "order.*" 的消息,消費者2訂閱的是 "order.#" 的消息,可以得到以下結果:
消費者1接收到的消息是:"Topic 模型發送的偶數條消息"
消費者2接收到的消息是:"Topic 模型發送的全部消息"
小結
本文介紹了 RabbitMQ 通信模型中的 Topic 模型的使用,通過交換機和 routing key 實現更靈活的消息路由。在實際使用過程中,需要注意以下幾點:
- 路由鍵的格式應該是多個單詞組成,用 '.' 分隔;
- '#' 匹配多個單詞,'*' 匹配一個單詞;
- 一個隊列可以綁定多個 routing key;
- 如果交換機沒有匹配到任何一個隊列,則會拋棄該消息。
-
交換機
+關注
關注
21文章
2637瀏覽量
99535 -
模型
+關注
關注
1文章
3226瀏覽量
48809 -
rabbitmq
+關注
關注
0文章
17瀏覽量
1026
發布評論請先 登錄
相關推薦
評論