背景
前段時間有個小項目需要使用延遲任務(wù),談到延遲任務(wù),我腦子第一時間一閃而過的就是使用消息隊列來做,比如RabbitMQ的死信隊列又或者RocketMQ的延遲隊列,但是奈何這是一個小項目,并沒有引入MQ,我也不太想因為一個延遲任務(wù)就引入MQ,增加系統(tǒng)復(fù)雜度,所以這個方案直接就被pass了。
雖然基于MQ這個方式走不通了,但是這個項目中使用到Redis,所以我就想是否能夠使用Redis來代替MQ實現(xiàn)延遲隊列的功能,于是我就查了一下有沒有現(xiàn)成可用的方案,別說,還真給我查到了兩種方案,并且我還仔細(xì)研究對比了這兩個方案,發(fā)現(xiàn)要想很好的實現(xiàn)延遲隊列,并不簡單。
監(jiān)聽過期key
基于監(jiān)聽過期key的方式來實現(xiàn)延遲隊列是我查到的第一個方案,為了弄懂這個方案實現(xiàn)的細(xì)節(jié),我還特地去扒了扒官網(wǎng),還真有所收獲
1、Redis發(fā)布訂閱模式
一談到發(fā)布訂閱模式,其實一想到的就是MQ,只不過Redis也實現(xiàn)了一套,并且跟MQ賊像,如圖:
圖中的channel的概念跟MQ中的topic的概念差不多,你可以把channel理解成MQ中的topic。
生產(chǎn)者在消息發(fā)送時需要到指定發(fā)送到哪個channel上,消費者訂閱這個channel就能獲取到消息。
在Redis中,有很多默認(rèn)的channel,只不過向這些channel發(fā)送消息的生產(chǎn)者不是我們寫的代碼,而是Redis本身。當(dāng)消費者監(jiān)聽這些channel時,就可以感知到Redis中數(shù)據(jù)的變化。
這個功能Redis官方稱為keyspace notifications,字面意思就是鍵空間通知。
這些默認(rèn)的channel被分為兩類:
以__keyspace@
舉個例子,現(xiàn)在有個消費者監(jiān)聽了__keyspace@0__:sanyou這個channel,sanyou就是Redis中的一個普通key,那么當(dāng)sanyou這個key被刪除或者發(fā)生了其它事件,那么消費者就會收到sanyou這個key刪除或者其它事件的消息
以__keyevent@
同樣舉個例子,現(xiàn)在有個消費者監(jiān)聽了__keyevent@0__:expired這個channel,代表了監(jiān)聽key的過期事件。那么當(dāng)某個Redis的key過期了(expired),那么消費者就能收到這個key過期的消息。如果把expired換成del,那么監(jiān)聽的就是刪除事件。具體支持哪些事件,可從官網(wǎng)查。
上述db是指具體的數(shù)據(jù)庫,Redis不是默認(rèn)分為16個庫么,序號從0-15,所以db就是0到15的數(shù)字,示例中的0就是指0對應(yīng)的數(shù)據(jù)庫。
3、延遲隊列實現(xiàn)原理
通過對上面的兩個概念了解之后,應(yīng)該就對監(jiān)聽過期key的實現(xiàn)原理一目了然了,其實就是當(dāng)這個key過期之后,Redis會發(fā)布一個key過期的事件到__keyevent@
所以這種方式實現(xiàn)延遲隊列就只需要兩步:
發(fā)送延遲任務(wù),key是延遲消息本身,過期時間就是延遲時間
監(jiān)聽__keyevent@
4、demo
好了,基本概念和核心原理都說完了之后,又到了show me the code環(huán)節(jié)。
好巧不巧,Spring已經(jīng)實現(xiàn)了監(jiān)聽__keyevent@*__:expired這個channel這個功能,__keyevent@*__:expired中的*代表通配符的意思,監(jiān)聽所有的數(shù)據(jù)庫。
所以demo寫起來就很簡單了,只需3步即可
引入pom
org.springframework.boot spring-boot-starter-data-redis 2.2.5.RELEASE org.springframework.boot spring-boot-starter-web 2.2.5.RELEASE
配置類
@Configuration publicclassRedisConfiguration{ @Bean publicRedisMessageListenerContainerredisMessageListenerContainer(RedisConnectionFactoryconnectionFactory){ RedisMessageListenerContainerredisMessageListenerContainer=newRedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(connectionFactory); returnredisMessageListenerContainer; } @Bean publicKeyExpirationEventMessageListenerredisKeyExpirationListener(RedisMessageListenerContainerredisMessageListenerContainer){ returnnewKeyExpirationEventMessageListener(redisMessageListenerContainer); } }
KeyExpirationEventMessageListener實現(xiàn)了對__keyevent@*__:expiredchannel的監(jiān)聽
當(dāng)KeyExpirationEventMessageListener收到Redis發(fā)布的過期Key的消息的時候,會發(fā)布RedisKeyExpiredEvent事件
所以我們只需要監(jiān)聽RedisKeyExpiredEvent事件就可以拿到過期消息的Key,也就是延遲消息。
對RedisKeyExpiredEvent事件的監(jiān)聽實現(xiàn)MyRedisKeyExpiredEventListener
@Component publicclassMyRedisKeyExpiredEventListenerimplementsApplicationListener{ @Override publicvoidonApplicationEvent(RedisKeyExpiredEventevent){ byte[]body=event.getSource(); System.out.println("獲取到延遲消息:"+newString(body)); } }
整個工程目錄也簡單
代碼寫好,啟動應(yīng)用
之后我直接通過Redis命令設(shè)置消息,就沒通過代碼發(fā)送消息了,消息的key為sanyou,值為task,值不重要,過期時間為5s
setsanyoutask expiresanyou5
如果上面都理論都正確,不出意外的話,5s后MyRedisKeyExpiredEventListener應(yīng)該可以監(jiān)聽到sanyou這個key過期的消息,也就相當(dāng)于拿到了延遲任務(wù),控制臺會打印出獲取到延遲消息:sanyou。
于是我滿懷希望,靜靜地等待了5s。。
5、4、3、2、1,時間一到,我查看控制臺,但是控制臺并沒有按照預(yù)期打印出上面那句話。
為什么會沒打印出?難道是代碼寫錯了?正當(dāng)我準(zhǔn)備檢查代碼的時候,官網(wǎng)的一段話道出了真實原因。
我給大家翻譯一下上面這段話講的內(nèi)容。
上面這段話主要討論的是key過期事件的時效性問題,首先提到了Redis過期key的兩種清除策略,就是面試八股文常背的兩種:
惰性清除。當(dāng)這個key過期之后,訪問時,這個Key才會被清除
定時清除。后臺會定期檢查一部分key,如果有key過期了,就會被清除
再后面那段話是核心,意思是說,key的過期事件發(fā)布時機(jī)并不是當(dāng)這個key的過期時間到了之后就發(fā)布,而是這個key在Redis中被清理之后,也就是真正被刪除之后才會發(fā)布。
到這我終于明白了,上面的例子中即使我設(shè)置了5s的過期時間,但是當(dāng)5s過去之后,只要兩種清除策略都不滿足,沒人訪問sanyou這個key,后臺的定時清理的任務(wù)也沒掃描到sanyou這個key,那么就不會發(fā)布key過期的事件,自然而然也就監(jiān)聽不到了。
至于后臺的定時清理的任務(wù)什么時候能掃到,這個沒有固定時間,可能一到過期時間就被掃到,也可能等一定時間才會被掃到,這就可能會造成了客戶端從發(fā)布到監(jiān)聽到的消息時間差會大于等于過期時間,從而造成一定時間消息的延遲,這就著實有點坑了。。
5、坑
除了上面測試demo的時候遇到的坑之外,在我深入研究之后,還發(fā)現(xiàn)了一些更離譜的坑。
丟消息太頻繁
Redis的丟消息跟MQ不一樣,因為MQ都會有消息的持久化機(jī)制,可能只有當(dāng)機(jī)器宕機(jī)了,才會丟點消息,但是Redis丟消息就很離譜,比如說你的服務(wù)在重啟的時候就消息會丟消息。
Redis實現(xiàn)的發(fā)布訂閱模式,消息是沒有持久化機(jī)制,當(dāng)消息發(fā)布到某個channel之后,如果沒有客戶端訂閱這個channel,那么這個消息就丟了,并不會像MQ一樣進(jìn)行持久化,等有消費者訂閱的時候再給消費者消費。
所以說,假設(shè)服務(wù)重啟期間,某個生產(chǎn)者或者是Redis本身發(fā)布了一條消息到某個channel,由于服務(wù)重啟,沒有監(jiān)聽這個channel,那么這個消息自然就丟了。
消息消費只有廣播模式
Redis的發(fā)布訂閱模式消息消費只有廣播模式一種。
所謂的廣播模式就是多個消費者訂閱同一個channel,那么每個消費者都能消費到發(fā)布到這個channel的所有消息。
如圖,生產(chǎn)者發(fā)布了一條消息,內(nèi)容為sanyou,那么兩個消費者都可以同時收到sanyou這條消息。
所以,如果通過監(jiān)聽channel來獲取延遲任務(wù),那么一旦服務(wù)實例有多個的話,還得保證消息不能重復(fù)處理,額外地增加了代碼開發(fā)量。
接收到所有key的某個事件
這個不屬于Redis發(fā)布訂閱模式的問題,而是Redis本身事件通知的問題。
當(dāng)消費者監(jiān)聽了以__keyevent@
舉個例子,某個消費者監(jiān)聽了__keyevent@*__:expired這個channel,那么只要key過期了,不管這個key是張三還會李四,消費者都能收到。
所以如果你只想消費某一類消息的key,那么還得自行加一些標(biāo)記,比如消息的key加個前綴,消費的時候判斷一下帶前綴的key就是需要消費的任務(wù)。
所以,綜上能夠得出一個非常重要的結(jié)論,那就是監(jiān)聽Redis過期Key這種方式實現(xiàn)延遲隊列,不穩(wěn)定,坑賊多!
那有沒有比較靠譜的延遲隊列的實現(xiàn)方案呢?這就不得不提到我研究的第二種方案了。
Redisson實現(xiàn)延遲隊列
Redisson他是Redis的兒子(Redis son),基于Redis實現(xiàn)了非常多的功能,其中最常使用的就是Redis分布式鎖的實現(xiàn),但是除了實現(xiàn)Redis分布式鎖之外,它還實現(xiàn)了延遲隊列的功能。
先來個demo,后面再來說說這種實現(xiàn)的原理。
1、demo
引入pom
org.redisson redisson 3.13.1
封裝了一個RedissonDelayQueue類
@Component @Slf4j publicclassRedissonDelayQueue{ privateRedissonClientredissonClient; privateRDelayedQueuedelayQueue; privateRBlockingQueue blockingQueue; @PostConstruct publicvoidinit(){ initDelayQueue(); startDelayQueueConsumer(); } privatevoidinitDelayQueue(){ Configconfig=newConfig(); SingleServerConfigserverConfig=config.useSingleServer(); serverConfig.setAddress("redis://localhost:6379"); redissonClient=Redisson.create(config); blockingQueue=redissonClient.getBlockingQueue("SANYOU"); delayQueue=redissonClient.getDelayedQueue(blockingQueue); } privatevoidstartDelayQueueConsumer(){ newThread(()->{ while(true){ try{ Stringtask=blockingQueue.take(); log.info("接收到延遲任務(wù):{}",task); }catch(Exceptione){ e.printStackTrace(); } } },"SANYOU-Consumer").start(); } publicvoidofferTask(Stringtask,longseconds){ log.info("添加延遲任務(wù):{}延遲時間:{}s",task,seconds); delayQueue.offer(task,seconds,TimeUnit.SECONDS); } }
這個類在創(chuàng)建的時候會去初始化延遲隊列,創(chuàng)建一個RedissonClient對象,之后通過RedissonClient對象獲取到RDelayedQueue和RBlockingQueue對象,傳入的隊列名字叫SANYOU,這個名字無所謂。
當(dāng)延遲隊列創(chuàng)建之后,會開啟一個延遲任務(wù)的消費線程,這個線程會一直從RBlockingQueue中通過take方法阻塞獲取延遲任務(wù)。
添加任務(wù)的時候是通過RDelayedQueue的offer方法添加的。
controller類,通過接口添加任務(wù),延遲時間為5s
@RestController publicclassRedissonDelayQueueController{ @Resource privateRedissonDelayQueueredissonDelayQueue; @GetMapping("/add") publicvoidaddTask(@RequestParam("task")Stringtask){ redissonDelayQueue.offerTask(task,5); } }
啟動項目,添加任務(wù)
靜靜等待5s,成功獲取到任務(wù)。
2、實現(xiàn)原理
如下圖就是上面demo中,一個延遲隊列會在Redis內(nèi)部使用到的channel和數(shù)據(jù)類型
SANYOU前面的前綴都是固定的,Redisson創(chuàng)建的時候會拼上前綴。
redisson_delay_queue_timeout:SANYOU,sorted set數(shù)據(jù)類型,存放所有延遲任務(wù),按照延遲任務(wù)的到期時間戳(提交任務(wù)時的時間戳 + 延遲時間)來排序的,所以列表的最前面的第一個元素就是整個延遲隊列中最早要被執(zhí)行的任務(wù),這個概念很重要
redisson_delay_queue:SANYOU,list數(shù)據(jù)類型,也是存放所有的任務(wù),但是研究下來發(fā)現(xiàn)好像沒什么用。。
SANYOU,list數(shù)據(jù)類型,被稱為目標(biāo)隊列,這個里面存放的任務(wù)都是已經(jīng)到了延遲時間的,可以被消費者獲取的任務(wù),所以上面demo中的RBlockingQueue的take方法是從這個目標(biāo)隊列中獲取到任務(wù)的
redisson_delay_queue_channel:SANYOU,是一個channel,用來通知客戶端開啟一個延遲任務(wù)
有了這些概念之后,再來看看整體的運行原理圖
生產(chǎn)者在提交任務(wù)的時候?qū)⑷蝿?wù)放到redisson_delay_queue_timeout:SANYOU中,分?jǐn)?shù)就是提交任務(wù)的時間戳+延遲時間,就是延遲任務(wù)的到期時間戳
客戶端會有一個延遲任務(wù),為了區(qū)分,后面我都說是客戶端延遲任務(wù)。這個延遲任務(wù)會向Redis Server發(fā)送一段lua腳本,Redis執(zhí)行l(wèi)ua腳本中的命令,并且是原子性的
這段lua腳本主要干了兩件事:
將到了延遲時間的任務(wù)從redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU這個目標(biāo)隊列
獲取到redisson_delay_queue_timeout:SANYOU中目前最早到過期時間的延遲任務(wù)的到期時間戳,然后發(fā)布到redisson_delay_queue_channel:SANYOU這個channel中
當(dāng)客戶端監(jiān)聽到redisson_delay_queue_channel:SANYOU這個channel的消息時,會再次提交一個客戶端延遲任務(wù),延遲時間就是消息(最早到過期時間的延遲任務(wù)的到期時間戳)- 當(dāng)前時間戳,這個時間其實也就是redisson_delay_queue_channel:SANYOU中最早到過期時間的任務(wù)還剩余的延遲時間。
此處可以等待10s,好好想想。。
這樣,一旦時間來到了上面說的最早到過期時間任務(wù)的到期時間戳,redisson_delay_queue_timeout:SANYOU中上面說的最早到過期時間的任務(wù)已經(jīng)到期了,客戶端的延遲任務(wù)也同時到期,于是開始執(zhí)行l(wèi)ua腳本操作,及時將到了延遲時間的任務(wù)放到目標(biāo)隊列中。然后再次發(fā)布剩余的延遲任務(wù)中最早到期的任務(wù)到期時間戳到channe中,如此循環(huán)往復(fù),一直運行下去,保證redisson_delay_queue_timeout:SANYOU中到期的數(shù)據(jù)能及時放到目標(biāo)隊列中。
所以,上述說了一大堆的主要的作用就是保證到了延遲時間的任務(wù)能夠及時被放到目標(biāo)隊列。
這里再補(bǔ)充兩個特殊情況,圖中沒有畫出:
第一個就是如果redisson_delay_queue_timeout:SANYOU是新添加的任務(wù)(隊列之前有或者沒有任務(wù))是隊列中最早需要被執(zhí)行的,也會發(fā)布消息到channel,之后就按時上面說的流程走了。
添加任務(wù)代碼如下,也是通過lua腳本來的
第二種特殊情況就是項目啟動的時候會執(zhí)行一次客戶端延遲任務(wù)。項目在重啟時,由于沒有客戶端延遲任務(wù)的執(zhí)行,可能會出現(xiàn)redisson_delay_queue_timeout:SANYOU隊列中有到期但是沒有被放到目標(biāo)隊列的可能,重啟就執(zhí)行一次就是為了保證到期的數(shù)據(jù)能被及時放到目標(biāo)隊列中。
3、與第一種方案比較
現(xiàn)在來比較一下第一種方案和Redisson的這種方案,看看有沒有第一種方案的那些坑。
第一個任務(wù)延遲的問題,Redisson方案理論上是沒有延遲的,但是當(dāng)消息數(shù)量增加,消費者消費緩慢這個情況下可能會導(dǎo)致延遲任務(wù)消費的延遲。
第二個丟消息的問題,Redisson方案很大程度上減輕了丟消息的可能性,因為所有的任務(wù)都是存在list和sorted set兩種數(shù)據(jù)類型中,Redis有持久化機(jī)制,就算Redis宕機(jī)了,也就可能會丟一點點數(shù)據(jù)。
第三個廣播消費任務(wù)的問題,這個是不會出現(xiàn)的,因為每個客戶端都是從同一個目標(biāo)隊列中獲取任務(wù)的。
第四個問題是Redis內(nèi)部channel發(fā)布事件的問題,跟這種方案不沾邊,就更不可能存在了。
所以,通過上面的對比可以看出,Redisson這種實現(xiàn)方案就顯得更加的靠譜了。
審核編輯:劉清
-
數(shù)據(jù)庫
+關(guān)注
關(guān)注
7文章
3816瀏覽量
64448 -
lua腳本
+關(guān)注
關(guān)注
0文章
21瀏覽量
7590 -
Redis
+關(guān)注
關(guān)注
0文章
376瀏覽量
10882
原文標(biāo)題:用 Redis 實現(xiàn)延遲隊列,我研究了兩種方案,發(fā)現(xiàn)并不簡單
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論