色哟哟视频在线观看-色哟哟视频在线-色哟哟欧美15最新在线-色哟哟免费在线观看-国产l精品国产亚洲区在线观看-国产l精品国产亚洲区久久

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

一種基于MySQL實現的stream隊列

jf_ro2CN3Fa ? 來源:稀土掘金 ? 2023-11-24 09:53 ? 次閱讀

EMS

Extend MySQL Stream;一種基于 MySQL 實現的 stream 隊列。

功能

集群消費、廣播消費

自動重試、死信隊列

快速重置消息位點,快速回放消息,快速查詢消息

消息可基于磁盤積壓、消息可快速清理

監控 group 積壓,topic 消息量排行,消息鏈路追蹤,消息消費超時告警;

讀寫性能 1200-3000 QPS 左右

寫入設計

msg id 就是 topic 維度的自增 id,可對多個 topic 并發寫入

針對一個 topic,需要有物理 physics offset, 每次寫入,topic 維度的 physics_offset 自增加一

如果使用 redis 自增特性實現, 為每個 topic 配置一個自增 key, 則可以避免加鎖.

redis 實現雖然性能好, 如為配置aof,宕機則可能導致丟失數據, 此時,會出現 offset 重復異常, 過一會隨著繼續自增, 也就恢復了.

寫入需要上鎖嗎? 看怎么寫, 如果使用非原子的形式自增 id,比如數據的的方式,先查出最大 id,再加一,那么必須加鎖

topic 維度的自增 id 如果使用 mysql 實現, 性能不堪受辱,因此,此處使用 redis 自增實現(可配置為 mysql 實現);

經過測試,筆記本電腦,單 topic 20 并發寫入,qps 在 1000-1500 左右(local mysql & local redis),基本滿足業務需求。

考慮到高可用性和業務場景,此處無法使用批量插入

所有的 topic 和 msg 都寫入的這一張表中,表數據定時清理,消費完的消息,可提前刪除。

注意,本方案寫入性能瓶頸是 MySQL Server 的性能瓶頸。

讀取設計

假設針對一個 topic,只有一個 consumer,只需循環讀取,然后更新 offset 即可。

但結合實際業務場景,這種基本不存在,所以,忽略這種場景。

通常,一個 topic 有多個 consumer group(簡稱 tg), 一個 consumer group 有多個 client(jvm or thread

如果一個 topic + group(簡稱 tg),有多個 consumer,每個 consumer 有多個線程,讀取和更新 offset 則會有并發問題, 如下圖。

這個 client id,我們將其設計為,ip + pid + uuid + thread id;

ip 和 pid 可幫助我們追溯問題

uuid 簡單防重復

thread id,一種性能優化,下面細說。

結合實際業務場景,且遵循 simple is better 原則,讀取時,使用上鎖的方式解決并發問題。鎖的粒度就是 tg

考慮到要實現基本的順序讀取和防止重復消費,多線程并發時,我們應當實現基于自增的形式讀取 msg;每個 clientid 讀取消息后,都會記錄一個簡單的log,并在 tg 維度增加一個 max offset

每次讀取消息時,每個 client 都需要去檢查當前想要讀取的 tg 是否已經有【其他 client】在操作 max offset。即,我們將鎖的粒度縮小到了 max offset;

對這個 tg 維度的 max offset + n

批量插入這個 tg + clientid offset log,表明這個消息被這個 clientid 讀取了,同時也間接更新了 max offset(order by offset)

釋放鎖

拉取剛剛讀取的 msg id list 里面的消息體

交給業務處理消息

整體原則是,一個 t + g 的 max offset,同時只能有一個 thread 操作(寫和更新)

如果有其他人在讀取,則阻塞

如果沒有其他人在讀取,則鎖住這個 tg, 并批量拉取一定數量的消息 id,

fe6133ce-8a69-11ee-939d-92fbcf53809c.jpg

ack

對于集群消息,如何保證在斷電情況下,消息不丟失,使用數據庫存儲消息, 寫入即不會丟失, 但消費時, 如果剛剛讀進內存就立刻宕機,則需要在重啟時恢復消息.

每個 client get 到消息后,都需要記錄 msg pid,consumer group,state(start、done,retry)為 start 狀態

ack success,將 log update 為 done 狀態

ack fail 后,將 log update 為 retry 狀態,同時將消息存入重試隊列

如果 client 還存活,超過 1 分鐘(可配),則將其撈出,放進重試隊列,并在 10s 進行第一次重試

如果 client 還存活,則立刻將其撈出,放進重試隊列,并在 10s 進行第一次重試

這里需要上鎖嗎?其實是不需要的,因為更新的維度是 client id 的 log,不存在并發更新. 這里更新狀態是表示這些消息已經處理結束了,否則無法判定宕機場景。

對于 start 狀態的消息,定時任務會去檢查

ack 是批量的,ack 失敗,僅會導致重復消費。

廣播消息

是否為廣播消息由 topic 確定

廣播消息不需要上鎖,每一個訂閱該 topic 的 client 都會讀取到該消息

廣播消息不需要 ack,不需要記錄成功或失敗或重試,僅需要內存里記錄 offset

推薦盡可能使用集群模式,使用集群模式模擬廣播模式

client id

只有 consumer 需要 client id

client id 由 ip pid uuid + thread id 組成, 可溯源.

client id 需要續約(5s),如果機器宕機,則會被自動清除,且他的 start 狀態的消息會進入重試隊列,交給同 group 的其他 client

client id 可以自己主動注銷,注銷前,自己內存的消息應當被優雅消費結束,一般來講,kill -15 的 jvm 都會主動注銷 client id;

核心表設計

topic 表:記錄topic 元信息

group 表:記錄 group 訂閱元信息

msg 表:msg總表,記錄寫入的信息,包含 body 和 topic 維度的自增 offset,類似 rocketmq commit log

該表會被多個 consumer 消費的消息

該表會被定制刪除過期數據

retry msg 表,消費失敗、超時的消息,會進入該表,并按階梯定時消費

dead msg 表,消費重試 16(any config) 次的消息,會進入該表

topic_group_log 表:記錄 consumer group client 的 msg 消費記錄,包含 state(start、done,retry) 字段,可 ack

該表的記錄行數會非常多,單行數據較少,可自動刪除 done 的記錄

如上文所說,由于本方案未采用常見的多 queue 和多 partition 的設計,因此瓶頸在于上圖提到的分布式鎖的設計上,具體鏈路為 consumer group client 在集群消費時, 為了讓并發讀取的 thread 拉取到的消息盡可能準確,使用上鎖的方式來實現。

總體看下來,可以簡單理解為,ems 失去了性能,卻擁有了所有。







審核編輯:劉清

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • MySQL
    +關注

    關注

    1

    文章

    815

    瀏覽量

    26605
  • QPS
    QPS
    +關注

    關注

    0

    文章

    24

    瀏覽量

    8812

原文標題:如何設計一款基于 MySQL 實現的 Message Queue

文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    Stream API原理介紹

    Stream API 是 Java 8 中最重要的新特性之,它是處理集合和數組的一種新方式。它提供了一種簡單、靈活和可讀的方式來處理集合和數組中的元素,從而使代碼更加簡潔、高效和易于
    的頭像 發表于 09-30 15:31 ?720次閱讀

    聊消息隊列技術選型的7消息場景

    我們在做消息隊列的技術選型時,往往會結合業務場景進行考慮。今天來聊聊消息隊列可能會用到的 7 消息場景。
    的頭像 發表于 12-09 17:50 ?1394次閱讀
    聊<b class='flag-5'>一</b>聊消息<b class='flag-5'>隊列</b>技術選型的7<b class='flag-5'>種</b>消息場景

    Redis Stream應用案例

    的IoT設備會形成巨大的數據洪流,采集完成后在云端進行分析,產生巨大的用戶價值。這些數據雖然內容各個不同,但是都有個共同的特點,都是一種時序數據。看到這里,你可能會突然發現,Redis Stream
    發表于 06-26 17:15

    怎樣去設計一種采用覆蓋機制的FIFO隊列模型呢

    FIFO隊列是什么?怎樣去設計一種采用覆蓋機制的FIFO隊列模型呢?
    發表于 12-08 06:07

    如何利用Java swing mysql實現一種電影票訂票管理系統呢

    Java swing mysql實現的電影票訂票管理系統,主要實現的功能有:用戶端:登錄注冊、查看電影信息、選擇影院場次、選座購票、查看自己的影票、評價電影等功能。管理員:登錄、電影管理、影院管理
    發表于 01-03 06:23

    實現隊列環形緩沖的方法

    串口隊列環形緩沖區隊列串口環形緩沖的好處代碼實現隊列??要實現隊列環形緩沖,還需要
    發表于 02-21 07:11

    如何去實現一種隊列程序的設計呢

    隊列的原理是什么?隊列有何作用?如何去實現一種隊列程序的設計呢?
    發表于 02-25 07:50

    一種改進的主動隊列管理算法

    主動隊列管理是實現網絡擁塞控制的重要技術,但是多數主動隊列管理算法如隨機早期檢(RED)都存在對參數依賴性強的問題。針對RED算法中平均隊列長度不能完全反映網絡擁塞狀況的
    發表于 04-13 09:08 ?14次下載

    一種高效的磁盤隊列I/O機制

    分析了傳統磁盤隊列的存儲管理開銷和讀寫性能,針對磁盤隊列I/O已成為影響消息服務器性能的首要瓶頸,提出了一種高效磁盤隊列I/O機制—FlashQ。FlashQ采用物理上連續的磁盤塊
    發表于 05-14 19:51 ?32次下載

    一種基于速率的公平隊列管理算法

    針對主動隊列管理算法普遍存在的公平性問題,提出基于速率的公平隊列管理算法RFED。該算法根據分組的到達速率調節丟包率,將隊列的到達速率控制在鏈路的服務速率下,根據
    發表于 10-04 14:11 ?15次下載

    深度解析數據結構與算法篇之隊列及環形隊列實現

    的位置。 02 — 環形隊列實現 要想將元素放入隊列我們必須知道對頭和隊尾,在隊列長度不能無限大的條件下我們還要知道隊列的最大容量,我們還
    的頭像 發表于 06-18 10:07 ?1947次閱讀

    隊列實現棧原理是什么?隊列實現棧方案有哪幾種?

    棧是一種后進先出的數據結構,而隊列一種先進先出的數據結構,兩者原理不難理解,使用也簡單。
    的頭像 發表于 07-04 13:28 ?2759次閱讀
    <b class='flag-5'>隊列</b><b class='flag-5'>實現</b>棧原理是什么?<b class='flag-5'>隊列</b><b class='flag-5'>實現</b>棧方案有哪幾種?

    TencentOS-tiny中環形隊列實現

    1. 什么是隊列隊列(queue)是一種只能在端插入元素、在另端刪除元素的數據結構,遵循「先入先出」(FIFO)的規則。 隊列中有兩個基
    的頭像 發表于 10-08 16:30 ?1390次閱讀

    一種基于單片機實現隊列功能模塊

    基于單片機實現隊列功能模塊,主要用于8位、16位、32位非運行RTOS的單片機應用,兼容大多數單片機平臺。
    的頭像 發表于 08-14 11:09 ?854次閱讀
    <b class='flag-5'>一種</b>基于單片機<b class='flag-5'>實現</b>的<b class='flag-5'>隊列</b>功能模塊

    嵌入式環形隊列與消息隊列實現原理

    嵌入式環形隊列,也稱為環形緩沖區或循環隊列,是一種先進先出(FIFO)的數據結構,用于在固定大小的存儲區域中高效地存儲和訪問數據。其主要特點包括固定大小的數組和兩個指針(頭指針和尾指針),分別指向
    的頭像 發表于 09-02 15:29 ?543次閱讀
    主站蜘蛛池模板: 亚洲欧美一区二区三区蜜芽| 日日啪无需播放器| 欧美ZC0O人与善交的最新章节| 亚洲精品国产高清不卡在线| 光棍天堂在线a| 日韩精品一卡二卡三卡四卡2021 | 99热国产这里只有精品6| 久久成人国产精品一区二区| 亚洲高清国产拍精品5g| 国产在线观看www鲁啊鲁免费| 色综合久久中文色婷婷| 国产a在线不卡| 丝袜美女被啪啪不带套漫画| 工口肉肉彩色不遮挡| 无码人妻精品国产婷婷| 国产色综合色产在线视频| 失禁h啪肉尿出来高h| 国产成人自产拍免费视频| 视频专区亚洲欧美日韩| 国产不卡免费| 亚洲haose在线观看| 九九热视频免费观看| 中文字幕乱码亚洲无线三区| 男欢女爱免费视频| yellow免费观看直播| 日韩欧美一区二区中文字幕 | 欧美gay老头互吃| 99久久国产综合精品| 欧美gay69| 刺激一区仑乱| 亚洲国产精品无码中文在线| 国拍自产精品福利区| 在线 国产 欧美 专区| 免费人妻无码AV不卡在线| ae58老司机福利| 手机在线观看你懂的| 国产一区二区高清| 中文字幕久久久| 日本一本免费线观看视频| 国产在线观看99| 99视频在线国产|