01、簡(jiǎn)介
Xline 是一款開(kāi)源的分布式 KV 存儲(chǔ)引擎,用于管理少量的關(guān)鍵性數(shù)據(jù),其核心目標(biāo)是實(shí)現(xiàn)高性能的數(shù)據(jù)訪問(wèn),以及保證跨數(shù)據(jù)中心場(chǎng)景下的強(qiáng)一致性。 Xline 對(duì)外提供了一系列兼容 etcd 的訪問(wèn)接口,比如 KV、Watch、Lease 等等。本文將會(huì)著重介紹一下其中的 Lease 接口 。
Lease 是一種客戶端和服務(wù)端之間的租約機(jī)制。類(lèi)似于我們現(xiàn)實(shí)生活中的租車(chē)服務(wù),當(dāng)我們需要使用一輛車(chē)時(shí),我們可以向租車(chē)公司申請(qǐng)一個(gè) lease,租車(chē)公司會(huì)給我們分配一輛車(chē),并且保證在我們和租車(chē)公司約定的有效期內(nèi)不會(huì)把這輛車(chē)分配給其他人,如果我們想要延長(zhǎng)使用時(shí)間,我們可以向租車(chē)公司續(xù)租,如果我們不再需要使用這輛車(chē),我們可以主動(dòng)歸還并取消,或者等待 lease 過(guò)期后自動(dòng)歸還。
在 Xline 中對(duì) lease 的使用和現(xiàn)實(shí)生活中的租車(chē)服務(wù)很相似,客戶端可以向服務(wù)點(diǎn)申請(qǐng)一個(gè) lease,服務(wù)端會(huì)保證在 lease 的有效期內(nèi)不會(huì)刪除這個(gè) lease,客戶端也可以通過(guò)相應(yīng)的接口提前結(jié)束或者延長(zhǎng) lease 的時(shí)間,與現(xiàn)實(shí)中租車(chē)不同的是,我們可以在這個(gè) lease 上綁定一些 key-value,這些 key-value 會(huì)隨著 lease 的過(guò)期被刪除。
根據(jù)以上介紹的 lease 的能力,我們可以在很多場(chǎng)景下使用 lease 來(lái)實(shí)現(xiàn)我們的目的,以下是幾個(gè)常見(jiàn)的 lease 應(yīng)用場(chǎng)景:
- 分布式鎖: 分布式鎖是通過(guò)多個(gè)機(jī)制一同實(shí)現(xiàn)的,lease 在分布式鎖中起到避免死鎖的作用。客戶端在請(qǐng)求分布式鎖的時(shí)候,會(huì)創(chuàng)建一個(gè) lease 并不斷續(xù)租,并且寫(xiě)入 key-value 并附加該 lease,這個(gè) key-value 代表分布式鎖的占用狀態(tài),如果占用該鎖的客戶端因故障無(wú)法主動(dòng)釋放鎖,lease 機(jī)制也會(huì)保證在 lease 過(guò)期后自動(dòng)刪除對(duì)應(yīng)的 key-value 來(lái)釋放當(dāng)前鎖。
- 服務(wù)注冊(cè)中心: 注冊(cè)新服務(wù)時(shí)創(chuàng)建 lease,并寫(xiě)入服務(wù)相關(guān)信息的 key-value 附加該 lease,在服務(wù)存活期間,對(duì)應(yīng)服務(wù)會(huì)一直對(duì)其 lease 續(xù)租,服務(wù)故障后無(wú)法自動(dòng)續(xù)租,對(duì)應(yīng) key-value 自動(dòng)刪除,相應(yīng)的服務(wù)就會(huì)在注冊(cè)中心中注銷(xiāo)。
- 分布式系統(tǒng)中的授權(quán)管理: 客戶端通過(guò)申請(qǐng) lease 來(lái)獲取資源的訪問(wèn)權(quán)限,如果客戶端失去與服務(wù)端的連接,或者由于故障沒(méi)有及時(shí)續(xù)租,導(dǎo)致 lease 過(guò)期,該客戶端就會(huì)失去相應(yīng)的權(quán)限
02、架構(gòu)
上圖是一個(gè) lease 實(shí)現(xiàn)的簡(jiǎn)單架構(gòu)圖,外部 Client 可以通過(guò)兩種方式向Xline集群發(fā)送請(qǐng)求,一種是直接通過(guò) Curp
協(xié)議向集群內(nèi)所有節(jié)點(diǎn)廣播請(qǐng)求,Curp
模塊達(dá)成共識(shí)后,會(huì)把這個(gè)請(qǐng)求應(yīng)用到狀態(tài)機(jī),也就是將其寫(xiě)入存儲(chǔ)層;另一種發(fā)送請(qǐng)求的方式就是 Client 直接將請(qǐng)求發(fā)送到集群中一個(gè)節(jié)點(diǎn)的 LeaseServer
,這也是與 etcd 兼容的請(qǐng)求方式,請(qǐng)求到達(dá) LeaseServer
后,會(huì)有兩條不同的處理路徑,多數(shù)請(qǐng)求會(huì)通過(guò) Server 端綁定的 Curp client 廣播給集群中所有節(jié)點(diǎn),剩下的少部分請(qǐng)求可能只有部分節(jié)點(diǎn)能夠處理,這些請(qǐng)求就會(huì)被轉(zhuǎn)發(fā)到這些節(jié)點(diǎn)的 LeaseServer
,然后應(yīng)用到狀態(tài)機(jī)。
03、源碼分析
源碼組織
Lease 相關(guān)的源碼主要保存在以下文件中,大致分為三個(gè)部分:
- RPC 定義:
xlineapi/proto/rpc.proto
:Xline 內(nèi)各 Server 的 rpc 接口定義,包括 LeaseServer接口定義。xlineapi/proto/lease.proto
:lease 的 rpc message 定義。
- LeaseServer實(shí)現(xiàn):
xline/src/server/lease_server.rs
:負(fù)責(zé)提供 Lease RPC service 的具體實(shí)現(xiàn),主要目的是提供 etcd 兼容接口,如果使用外部的 curp client 直接發(fā)送 propose 可以不經(jīng)過(guò)此接口,但也有部分不經(jīng)過(guò)共識(shí)協(xié)議的請(qǐng)求必須通過(guò) LeaseServer 處理。
LeaseStore實(shí)現(xiàn):xline/src/storage/lease_store/lease.rs
:定義了Lease
數(shù)據(jù)結(jié)構(gòu),用于保存 Lease相關(guān)的信息,比如 Lease 上綁定的所有 Key, Lease 的過(guò)期時(shí)間,Lease 的剩余 TTL 長(zhǎng)度等。并為其實(shí)現(xiàn)了一些實(shí)用的方法。xline/src/storage/lease_store/lease_queue.rs
:定義了LeaseQueue
和相關(guān)的方法,LeaseQueue
是一個(gè)由 lease id 以及 lease 過(guò)期時(shí)間組成的優(yōu)先隊(duì)列,一個(gè)后臺(tái)常駐 task 會(huì)定時(shí)通過(guò)此結(jié)構(gòu)獲取所有過(guò)期 lease 的 id。xline/src/storage/lease_store/lease_collection.rs
:定義了LeaseCollection
和相關(guān)的方法,LeasCollection
是 lease 核心數(shù)據(jù)結(jié)構(gòu)的集合,提供 lease 機(jī)制的核心能力。結(jié)構(gòu)內(nèi)部主要包含三個(gè)部分,lease_map
保存所有 lease 結(jié)構(gòu);item_map
緩存 key 到 lease id 映射;expired_queue
管理 lease 過(guò)期時(shí)間,expired_queue
只在 leader 節(jié)點(diǎn)上有意義,其它節(jié)點(diǎn)上為空。xline/src/storage/lease_store/mod.rs
:LeaseStore
的定義及方法實(shí)現(xiàn)。負(fù)責(zé)提供 lease 的存儲(chǔ)層抽象,對(duì)外提供所有 lease 相關(guān)操作的存儲(chǔ)層接口。其內(nèi)部包含LeaseCollection
以及和KvStore
共享的一些數(shù)據(jù)結(jié)構(gòu)。
Lease 的創(chuàng)建
想要使用 lease,首先就要?jiǎng)?chuàng)建一個(gè) lease,創(chuàng)建 lease 時(shí)需要使用 LeaseServer
提供的 LeaseGrant
接口。LeaseServer
中對(duì) LeaseGrant
的處理很簡(jiǎn)單,就是分配一個(gè) lease id,然后通過(guò) propose 把請(qǐng)求交給共識(shí)協(xié)議處理,達(dá)成共識(shí)后,請(qǐng)求會(huì)在 LeaseStore
中被執(zhí)行。
LeaseStore
會(huì)在 LeaseCollection
中創(chuàng)建并插入一個(gè)新的 Lease
,其核心代碼邏輯如下:
...if is_leader { let expiry = lease.refresh(Duration::ZERO); let _ignore = inner.expired_queue.insert(lease_id, expiry);} else { lease.forever();}let _ignore = inner.lease_map.insert(lease_id, lease.clone());...
需要注意的是,如果當(dāng)前節(jié)點(diǎn)是 leader 節(jié)點(diǎn)的話,還需要承擔(dān)管理 lease 過(guò)期時(shí)間的任務(wù),所以需要通過(guò)refresh
方法計(jì)算 Lease
的過(guò)期時(shí)間,并將其插入到 expired_queue
中。其他節(jié)點(diǎn)則不需要這一步處理,只需要將新的 Lease
插入到 lease_map
中。計(jì)算過(guò)期時(shí)間使用的 refresh
定義如下:
Lease 創(chuàng)建完成后,服務(wù)端會(huì)給客戶端返回一個(gè)包含 lease id 的響應(yīng)。
Lease的使用
獲取到 lease id 后,客戶端就可以通過(guò) lease id 來(lái)使用這個(gè) lease,在 Put 一對(duì) key value 時(shí)可以附加 lease id,這個(gè) Put 請(qǐng)求被應(yīng)用到狀態(tài)機(jī)時(shí),除了直接在 KvStore
的 Index
和 DB
中寫(xiě)入 key-value 以外,還會(huì)通過(guò)LeaseCollection
提供的 detach
方法分離當(dāng)前 key 和舊的 lease ,并通過(guò) attach
將需要 put 的 key 附加到新的 lease id 上。
pub(crate) fn attach(&self, lease_id: i64, key: Vec< u8 >) - > Result< (), ExecuteError > { let mut inner = self.inner.write(); let Some(lease) = inner.lease_map.get_mut(&lease_id) else { return Err(ExecuteError::lease_not_found(lease_id)); }; lease.insert_key(key.clone()); let _ignore = inner.item_map.insert(key, lease_id); Ok(())}
attach
的具體實(shí)現(xiàn)就是通過(guò) lease id
找到對(duì)應(yīng)的 Lease
,并將 key 附加到 Lease
上,以及在 item_map
中添加 key 到 lease id 的映射關(guān)系。detach
的實(shí)現(xiàn)與 attach
的相反,它會(huì)移除 attach
時(shí)插入的內(nèi)容。
經(jīng)過(guò)以上的過(guò)程,我們已經(jīng)成功將 key 和 lease id 關(guān)聯(lián)在一起,此時(shí)如果這個(gè) Lease
被主動(dòng) revoke 或者超時(shí),那么這個(gè) Lease
以及它關(guān)聯(lián)的所有 key,都會(huì)被刪除。
Lease 的主動(dòng)刪除
刪除一個(gè) lease 需要調(diào)用 LeaseRevoke
接口,這個(gè)接口在 LeaseServer
中的處理與 LeaseGrant
基本相同,都是將請(qǐng)求交給共識(shí)協(xié)議處理,唯一的不同是 LeaseRevoke
不需要分配 lease id。
let del_keys = match self.lease_collection.look_up(req.id) { Some(l) = > l.keys(), None = > return Err(ExecuteError::lease_not_found(req.id)),};if del_keys.is_empty() { let _ignore = self.lease_collection.revoke(req.id); return Ok(Vec::new());}// delete keys ...let _ignore = self.lease_collection.revoke(req.id);
LeaseRevoke
被執(zhí)行時(shí),首先會(huì)嘗試查找 Lease
是否有關(guān)聯(lián)的 key,如果沒(méi)有,那么就可以直接通過(guò) LeaseCollection
上的 revoke
方法將 Lease
刪除,如果有關(guān)聯(lián)的 key 的話那么就需要將關(guān)聯(lián)的所有 key 從 KvStore
中刪除,并清理 LeaseCollection
中這些 key 和 lease id 的關(guān)系,然后才能從 LeaseCollection
中 reovke
這個(gè) Lease
。
Lease 的過(guò)期
Lease 過(guò)期時(shí)的處理流程如上圖所示,此處省略了共識(shí)的部分,在初始化 LeaseServer
時(shí),會(huì)創(chuàng)建一個(gè)后臺(tái)常駐的 revoke_expired_leases_task
,這個(gè) task 的主體代碼如下:
loop { // only leader will check expired lease if lease_server.lease_storage.is_primary() { for id in lease_server.lease_storage.find_expired_leases() { let _handle = tokio::spawn({ let s = Arc::clone(&lease_server); async move { let request = tonic::Request::new(LeaseRevokeRequest { id }); if let Err(e) = s.lease_revoke(request).await { warn!("Failed to revoke expired leases: {}", e); } } }); } } time::sleep(DEFAULT_LEASE_REQUEST_TIME).await;}
在負(fù)責(zé)管理 Lease
過(guò)期時(shí)間節(jié)點(diǎn)上,這個(gè) task 會(huì)定時(shí)通過(guò) find_expired_leases
獲取已經(jīng)過(guò)期的所有 lease id, 然后調(diào)用 lease server 上的 lease_revoke
接口來(lái)刪除過(guò)期的 Lease
,這個(gè)接口和客戶度主動(dòng)刪除 Lease
時(shí)使用的是同一個(gè)接口。
find_expired_leases
是 LeaseCollection
上一個(gè)核心方法,具體實(shí)現(xiàn)如下:
pub(crate) fn find_expired_leases(&self) - > Vec< i64 > { let mut expired_leases = vec![]; let mut inner = self.inner.write(); while let Some(expiry) = inner.expired_queue.peek() { if *expiry <= Instant::now() { #[allow(clippy::unwrap_used)] // queue.peek() returns Some let id = inner.expired_queue.pop().unwrap(); if inner.lease_map.contains_key(&id) { expired_leases.push(id); } } else { break; } } expired_leases}
在創(chuàng)建 Lease
時(shí),我們已經(jīng)計(jì)算過(guò)了Lease
過(guò)期的時(shí)間并將其插入了 expired_queue
,調(diào)用 find_expired_queue
時(shí)會(huì)一直嘗試從優(yōu)先隊(duì)列隊(duì)頭拿出已經(jīng)過(guò)期的 Lease
,直到遇到第一個(gè)不過(guò)期的 Lease
后停止嘗試,然后將拿到的所有 lease id 返回。
Lease 的續(xù)租
如果想要讓創(chuàng)建的 Lease
能夠持續(xù)更長(zhǎng)時(shí)間,那就需要在客戶端和服務(wù)端之間維護(hù)一條 stream,客戶端定時(shí)向服務(wù)端發(fā)送 LeaseKeepAlive
請(qǐng)求。和前面提到的請(qǐng)求不同,LeaseKeepAlive
請(qǐng)求不需要經(jīng)過(guò)共識(shí)協(xié)議,因?yàn)檫@個(gè)請(qǐng)求依賴只存在于 leader 節(jié)點(diǎn)上的 Lease
過(guò)期時(shí)間,因此只有 leader 節(jié)點(diǎn)能夠處理 LeaseKeepAlive 請(qǐng)求,follower 節(jié)點(diǎn)會(huì)把請(qǐng)求轉(zhuǎn)發(fā)至 leader 節(jié)點(diǎn)上處理。具體的轉(zhuǎn)發(fā)邏輯可以參考 lease_server.rs
內(nèi)的源碼。
在 leader 和 client 建立起 stream 后,每當(dāng) leader 從 stream 中收到 lease id,都會(huì)為這個(gè) lease 續(xù)租,最終續(xù)租的邏輯是通過(guò) LeaseCollection
提供的 renew
方法實(shí)現(xiàn)的。該方法定義如下:
pub(crate) fn renew(&self, lease_id: i64) - > Result< i64, ExecuteError > { let mut inner = self.inner.write(); let (expiry, ttl) = { let Some(lease) = inner.lease_map.get_mut(&lease_id) else { return Err(ExecuteError::lease_not_found(lease_id)); }; if lease.expired() { return Err(ExecuteError::lease_expired(lease_id)); } let expiry = lease.refresh(Duration::default()); let ttl = lease.ttl().as_secs().cast(); (expiry, ttl) }; let _ignore = inner.expired_queue.update(lease_id, expiry); Ok(ttl)}
Renew 會(huì)先檢查對(duì)應(yīng) Lease
是否已經(jīng)過(guò)期,沒(méi)有過(guò)期的話就會(huì)重新計(jì)算過(guò)期時(shí)間,然后更新它在 expired_queue
中的順序。
只要 client 和 server 之間的連接不中斷,client 就會(huì)一直通過(guò) stream 向服務(wù)端發(fā)送 LeaseKeepAlive
請(qǐng)求,這個(gè) lease 也就不會(huì)超時(shí),前文提到的 lease 主要的應(yīng)用場(chǎng)景中,幾乎都用到了這個(gè)特性來(lái)判斷客戶端是否在正常運(yùn)行。
Lease 信息的讀取
Lease 有兩個(gè)讀取接口,一個(gè)是 LeaseTimeToLive
,這個(gè)接口會(huì)讀取一個(gè) lease 的詳細(xì)信息,包括它的過(guò)期時(shí)間,和 LeaseKeepAlive
一樣,因?yàn)檫^(guò)期時(shí)間只存在于 leader 節(jié)點(diǎn),因此該請(qǐng)求需要轉(zhuǎn)發(fā)只 leader 處理;另一個(gè)讀取接口是 LeaseLeases
,這個(gè)接口會(huì)列出系統(tǒng)中所有的 lease id,這個(gè)接口不需要 lease 過(guò)期時(shí)間的信息,因此可以直接交給共識(shí)協(xié)議處理,所以在 LeaseServer
中的處理和 LeaseGrant
、LeaseRevoke
相似。此處不再贅述。
LeaseTimeToLive
和 LeaseLeases
讀取信息的能力最終由 LeaseCollection
實(shí)現(xiàn),源碼如下:
pub(crate) fn look_up(&self, lease_id: i64) - > Option< Lease > { self.inner.read().lease_map.get(&lease_id).cloned()} pub(crate) fn leases(&self) - > Vec< Lease > { let mut leases = self .inner .read() .lease_map .values() .cloned() .collect::< Vec< _ >>(); leases.sort_by_key(Lease::remaining); leases}
04、總結(jié)
本文介紹了 Xline 下的一個(gè)重要接口 Lease,用戶可以通過(guò) Lease 實(shí)現(xiàn)一組 key 的定時(shí)過(guò)期,并且能夠通過(guò) KeepAlive 接口為 Lease 續(xù)租,服務(wù)端也能夠根據(jù)此特性探測(cè)客戶端是否在正常運(yùn)作。依賴于 Lease 機(jī)制的這些特點(diǎn),也誕生出了很多典型的應(yīng)用場(chǎng)景,比如本文介紹過(guò)的分布式鎖、服務(wù)注冊(cè)中心,授權(quán)管理等等。
-
RPC
+關(guān)注
關(guān)注
0文章
111瀏覽量
11534
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論