01、簡介
Xline 是一款開源的分布式 KV 存儲引擎,用于管理少量的關鍵性數(shù)據(jù),其核心目標是實現(xiàn)高性能的數(shù)據(jù)訪問,以及保證跨數(shù)據(jù)中心場景下的強一致性。 Xline 對外提供了一系列兼容 etcd 的訪問接口,比如 KV、Watch、Lease 等等。本文將會著重介紹一下其中的 Lease 接口 。
Lease 是一種客戶端和服務端之間的租約機制。類似于我們現(xiàn)實生活中的租車服務,當我們需要使用一輛車時,我們可以向租車公司申請一個 lease,租車公司會給我們分配一輛車,并且保證在我們和租車公司約定的有效期內(nèi)不會把這輛車分配給其他人,如果我們想要延長使用時間,我們可以向租車公司續(xù)租,如果我們不再需要使用這輛車,我們可以主動歸還并取消,或者等待 lease 過期后自動歸還。
在 Xline 中對 lease 的使用和現(xiàn)實生活中的租車服務很相似,客戶端可以向服務點申請一個 lease,服務端會保證在 lease 的有效期內(nèi)不會刪除這個 lease,客戶端也可以通過相應的接口提前結(jié)束或者延長 lease 的時間,與現(xiàn)實中租車不同的是,我們可以在這個 lease 上綁定一些 key-value,這些 key-value 會隨著 lease 的過期被刪除。
根據(jù)以上介紹的 lease 的能力,我們可以在很多場景下使用 lease 來實現(xiàn)我們的目的,以下是幾個常見的 lease 應用場景:
- 分布式鎖: 分布式鎖是通過多個機制一同實現(xiàn)的,lease 在分布式鎖中起到避免死鎖的作用。客戶端在請求分布式鎖的時候,會創(chuàng)建一個 lease 并不斷續(xù)租,并且寫入 key-value 并附加該 lease,這個 key-value 代表分布式鎖的占用狀態(tài),如果占用該鎖的客戶端因故障無法主動釋放鎖,lease 機制也會保證在 lease 過期后自動刪除對應的 key-value 來釋放當前鎖。
- 服務注冊中心: 注冊新服務時創(chuàng)建 lease,并寫入服務相關信息的 key-value 附加該 lease,在服務存活期間,對應服務會一直對其 lease 續(xù)租,服務故障后無法自動續(xù)租,對應 key-value 自動刪除,相應的服務就會在注冊中心中注銷。
- 分布式系統(tǒng)中的授權管理: 客戶端通過申請 lease 來獲取資源的訪問權限,如果客戶端失去與服務端的連接,或者由于故障沒有及時續(xù)租,導致 lease 過期,該客戶端就會失去相應的權限
02、架構
上圖是一個 lease 實現(xiàn)的簡單架構圖,外部 Client 可以通過兩種方式向Xline集群發(fā)送請求,一種是直接通過 Curp
協(xié)議向集群內(nèi)所有節(jié)點廣播請求,Curp
模塊達成共識后,會把這個請求應用到狀態(tài)機,也就是將其寫入存儲層;另一種發(fā)送請求的方式就是 Client 直接將請求發(fā)送到集群中一個節(jié)點的 LeaseServer
,這也是與 etcd 兼容的請求方式,請求到達 LeaseServer
后,會有兩條不同的處理路徑,多數(shù)請求會通過 Server 端綁定的 Curp client 廣播給集群中所有節(jié)點,剩下的少部分請求可能只有部分節(jié)點能夠處理,這些請求就會被轉(zhuǎn)發(fā)到這些節(jié)點的 LeaseServer
,然后應用到狀態(tài)機。
03、源碼分析
源碼組織
Lease 相關的源碼主要保存在以下文件中,大致分為三個部分:
- RPC 定義:
xlineapi/proto/rpc.proto
:Xline 內(nèi)各 Server 的 rpc 接口定義,包括 LeaseServer接口定義。xlineapi/proto/lease.proto
:lease 的 rpc message 定義。
- LeaseServer實現(xiàn):
xline/src/server/lease_server.rs
:負責提供 Lease RPC service 的具體實現(xiàn),主要目的是提供 etcd 兼容接口,如果使用外部的 curp client 直接發(fā)送 propose 可以不經(jīng)過此接口,但也有部分不經(jīng)過共識協(xié)議的請求必須通過 LeaseServer 處理。
LeaseStore實現(xiàn):xline/src/storage/lease_store/lease.rs
:定義了Lease
數(shù)據(jù)結(jié)構,用于保存 Lease相關的信息,比如 Lease 上綁定的所有 Key, Lease 的過期時間,Lease 的剩余 TTL 長度等。并為其實現(xiàn)了一些實用的方法。xline/src/storage/lease_store/lease_queue.rs
:定義了LeaseQueue
和相關的方法,LeaseQueue
是一個由 lease id 以及 lease 過期時間組成的優(yōu)先隊列,一個后臺常駐 task 會定時通過此結(jié)構獲取所有過期 lease 的 id。xline/src/storage/lease_store/lease_collection.rs
:定義了LeaseCollection
和相關的方法,LeasCollection
是 lease 核心數(shù)據(jù)結(jié)構的集合,提供 lease 機制的核心能力。結(jié)構內(nèi)部主要包含三個部分,lease_map
保存所有 lease 結(jié)構;item_map
緩存 key 到 lease id 映射;expired_queue
管理 lease 過期時間,expired_queue
只在 leader 節(jié)點上有意義,其它節(jié)點上為空。xline/src/storage/lease_store/mod.rs
:LeaseStore
的定義及方法實現(xiàn)。負責提供 lease 的存儲層抽象,對外提供所有 lease 相關操作的存儲層接口。其內(nèi)部包含LeaseCollection
以及和KvStore
共享的一些數(shù)據(jù)結(jié)構。
Lease 的創(chuàng)建
想要使用 lease,首先就要創(chuàng)建一個 lease,創(chuàng)建 lease 時需要使用 LeaseServer
提供的 LeaseGrant
接口。LeaseServer
中對 LeaseGrant
的處理很簡單,就是分配一個 lease id,然后通過 propose 把請求交給共識協(xié)議處理,達成共識后,請求會在 LeaseStore
中被執(zhí)行。
LeaseStore
會在 LeaseCollection
中創(chuàng)建并插入一個新的 Lease
,其核心代碼邏輯如下:
...if is_leader { let expiry = lease.refresh(Duration::ZERO); let _ignore = inner.expired_queue.insert(lease_id, expiry);} else { lease.forever();}let _ignore = inner.lease_map.insert(lease_id, lease.clone());...
需要注意的是,如果當前節(jié)點是 leader 節(jié)點的話,還需要承擔管理 lease 過期時間的任務,所以需要通過refresh
方法計算 Lease
的過期時間,并將其插入到 expired_queue
中。其他節(jié)點則不需要這一步處理,只需要將新的 Lease
插入到 lease_map
中。計算過期時間使用的 refresh
定義如下:
Lease 創(chuàng)建完成后,服務端會給客戶端返回一個包含 lease id 的響應。
Lease的使用
獲取到 lease id 后,客戶端就可以通過 lease id 來使用這個 lease,在 Put 一對 key value 時可以附加 lease id,這個 Put 請求被應用到狀態(tài)機時,除了直接在 KvStore
的 Index
和 DB
中寫入 key-value 以外,還會通過LeaseCollection
提供的 detach
方法分離當前 key 和舊的 lease ,并通過 attach
將需要 put 的 key 附加到新的 lease id 上。
pub(crate) fn attach(&self, lease_id: i64, key: Vec< u8 >) - > Result< (), ExecuteError > { let mut inner = self.inner.write(); let Some(lease) = inner.lease_map.get_mut(&lease_id) else { return Err(ExecuteError::lease_not_found(lease_id)); }; lease.insert_key(key.clone()); let _ignore = inner.item_map.insert(key, lease_id); Ok(())}
attach
的具體實現(xiàn)就是通過 lease id
找到對應的 Lease
,并將 key 附加到 Lease
上,以及在 item_map
中添加 key 到 lease id 的映射關系。detach
的實現(xiàn)與 attach
的相反,它會移除 attach
時插入的內(nèi)容。
經(jīng)過以上的過程,我們已經(jīng)成功將 key 和 lease id 關聯(lián)在一起,此時如果這個 Lease
被主動 revoke 或者超時,那么這個 Lease
以及它關聯(lián)的所有 key,都會被刪除。
Lease 的主動刪除
刪除一個 lease 需要調(diào)用 LeaseRevoke
接口,這個接口在 LeaseServer
中的處理與 LeaseGrant
基本相同,都是將請求交給共識協(xié)議處理,唯一的不同是 LeaseRevoke
不需要分配 lease id。
let del_keys = match self.lease_collection.look_up(req.id) { Some(l) = > l.keys(), None = > return Err(ExecuteError::lease_not_found(req.id)),};if del_keys.is_empty() { let _ignore = self.lease_collection.revoke(req.id); return Ok(Vec::new());}// delete keys ...let _ignore = self.lease_collection.revoke(req.id);
LeaseRevoke
被執(zhí)行時,首先會嘗試查找 Lease
是否有關聯(lián)的 key,如果沒有,那么就可以直接通過 LeaseCollection
上的 revoke
方法將 Lease
刪除,如果有關聯(lián)的 key 的話那么就需要將關聯(lián)的所有 key 從 KvStore
中刪除,并清理 LeaseCollection
中這些 key 和 lease id 的關系,然后才能從 LeaseCollection
中 reovke
這個 Lease
。
Lease 的過期
Lease 過期時的處理流程如上圖所示,此處省略了共識的部分,在初始化 LeaseServer
時,會創(chuàng)建一個后臺常駐的 revoke_expired_leases_task
,這個 task 的主體代碼如下:
loop { // only leader will check expired lease if lease_server.lease_storage.is_primary() { for id in lease_server.lease_storage.find_expired_leases() { let _handle = tokio::spawn({ let s = Arc::clone(&lease_server); async move { let request = tonic::Request::new(LeaseRevokeRequest { id }); if let Err(e) = s.lease_revoke(request).await { warn!("Failed to revoke expired leases: {}", e); } } }); } } time::sleep(DEFAULT_LEASE_REQUEST_TIME).await;}
在負責管理 Lease
過期時間節(jié)點上,這個 task 會定時通過 find_expired_leases
獲取已經(jīng)過期的所有 lease id, 然后調(diào)用 lease server 上的 lease_revoke
接口來刪除過期的 Lease
,這個接口和客戶度主動刪除 Lease
時使用的是同一個接口。
find_expired_leases
是 LeaseCollection
上一個核心方法,具體實現(xiàn)如下:
pub(crate) fn find_expired_leases(&self) - > Vec< i64 > { let mut expired_leases = vec![]; let mut inner = self.inner.write(); while let Some(expiry) = inner.expired_queue.peek() { if *expiry <= Instant::now() { #[allow(clippy::unwrap_used)] // queue.peek() returns Some let id = inner.expired_queue.pop().unwrap(); if inner.lease_map.contains_key(&id) { expired_leases.push(id); } } else { break; } } expired_leases}
在創(chuàng)建 Lease
時,我們已經(jīng)計算過了Lease
過期的時間并將其插入了 expired_queue
,調(diào)用 find_expired_queue
時會一直嘗試從優(yōu)先隊列隊頭拿出已經(jīng)過期的 Lease
,直到遇到第一個不過期的 Lease
后停止嘗試,然后將拿到的所有 lease id 返回。
Lease 的續(xù)租
如果想要讓創(chuàng)建的 Lease
能夠持續(xù)更長時間,那就需要在客戶端和服務端之間維護一條 stream,客戶端定時向服務端發(fā)送 LeaseKeepAlive
請求。和前面提到的請求不同,LeaseKeepAlive
請求不需要經(jīng)過共識協(xié)議,因為這個請求依賴只存在于 leader 節(jié)點上的 Lease
過期時間,因此只有 leader 節(jié)點能夠處理 LeaseKeepAlive 請求,follower 節(jié)點會把請求轉(zhuǎn)發(fā)至 leader 節(jié)點上處理。具體的轉(zhuǎn)發(fā)邏輯可以參考 lease_server.rs
內(nèi)的源碼。
在 leader 和 client 建立起 stream 后,每當 leader 從 stream 中收到 lease id,都會為這個 lease 續(xù)租,最終續(xù)租的邏輯是通過 LeaseCollection
提供的 renew
方法實現(xiàn)的。該方法定義如下:
pub(crate) fn renew(&self, lease_id: i64) - > Result< i64, ExecuteError > { let mut inner = self.inner.write(); let (expiry, ttl) = { let Some(lease) = inner.lease_map.get_mut(&lease_id) else { return Err(ExecuteError::lease_not_found(lease_id)); }; if lease.expired() { return Err(ExecuteError::lease_expired(lease_id)); } let expiry = lease.refresh(Duration::default()); let ttl = lease.ttl().as_secs().cast(); (expiry, ttl) }; let _ignore = inner.expired_queue.update(lease_id, expiry); Ok(ttl)}
Renew 會先檢查對應 Lease
是否已經(jīng)過期,沒有過期的話就會重新計算過期時間,然后更新它在 expired_queue
中的順序。
只要 client 和 server 之間的連接不中斷,client 就會一直通過 stream 向服務端發(fā)送 LeaseKeepAlive
請求,這個 lease 也就不會超時,前文提到的 lease 主要的應用場景中,幾乎都用到了這個特性來判斷客戶端是否在正常運行。
Lease 信息的讀取
Lease 有兩個讀取接口,一個是 LeaseTimeToLive
,這個接口會讀取一個 lease 的詳細信息,包括它的過期時間,和 LeaseKeepAlive
一樣,因為過期時間只存在于 leader 節(jié)點,因此該請求需要轉(zhuǎn)發(fā)只 leader 處理;另一個讀取接口是 LeaseLeases
,這個接口會列出系統(tǒng)中所有的 lease id,這個接口不需要 lease 過期時間的信息,因此可以直接交給共識協(xié)議處理,所以在 LeaseServer
中的處理和 LeaseGrant
、LeaseRevoke
相似。此處不再贅述。
LeaseTimeToLive
和 LeaseLeases
讀取信息的能力最終由 LeaseCollection
實現(xiàn),源碼如下:
pub(crate) fn look_up(&self, lease_id: i64) - > Option< Lease > { self.inner.read().lease_map.get(&lease_id).cloned()} pub(crate) fn leases(&self) - > Vec< Lease > { let mut leases = self .inner .read() .lease_map .values() .cloned() .collect::< Vec< _ >>(); leases.sort_by_key(Lease::remaining); leases}
04、總結(jié)
本文介紹了 Xline 下的一個重要接口 Lease,用戶可以通過 Lease 實現(xiàn)一組 key 的定時過期,并且能夠通過 KeepAlive 接口為 Lease 續(xù)租,服務端也能夠根據(jù)此特性探測客戶端是否在正常運作。依賴于 Lease 機制的這些特點,也誕生出了很多典型的應用場景,比如本文介紹過的分布式鎖、服務注冊中心,授權管理等等。
-
RPC
+關注
關注
0文章
111瀏覽量
11585
發(fā)布評論請先 登錄
相關推薦
Faster Transformer v2.1版本源碼解讀
![Faster Transformer v2.1版本<b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>](https://file1.elecfans.com/web2/M00/A3/B5/wKgZomUJF9eAXWDQAAA5V2i2udk464.png)
OneFlow Softmax算子源碼解讀之WarpSoftmax
![OneFlow Softmax算子<b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>之WarpSoftmax](https://file1.elecfans.com/web2/M00/BB/4D/wKgZomWbT1CAWm4KAAA2IsyxqO4463.png)
OneFlow Softmax算子源碼解讀之BlockSoftmax
![OneFlow Softmax算子<b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>之BlockSoftmax](https://file1.elecfans.com/web2/M00/BC/38/wKgaomWbT5GAZdkPAAAcgecZoiU624.png)
聊聊Dubbo - Dubbo可擴展機制源碼解析
直播系統(tǒng)源碼選擇二開的好處是什么
闡述FreeRTOS系統(tǒng)中機制的實現(xiàn)原理
AP側(cè)中網(wǎng)相關的PLMN業(yè)務源碼流程解讀
風河:用“商業(yè)機制”保護開放源碼的價值
OC的消息轉(zhuǎn)發(fā)機制的深度解讀
基于EAIDK的人臉算法應用-源碼解讀(2)
openharmony源碼解讀
Xline源碼解讀(一)—初識CURP協(xié)議
![<b class='flag-5'>Xline</b><b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>(一)—初識CURP協(xié)議](https://file1.elecfans.com/web2/M00/A8/F5/wKgZomUiSVGAMeKbAAB5Fgkf3mg69.jpeg)
Xline源碼解讀(三)—CURP Server的實現(xiàn)
![<b class='flag-5'>Xline</b><b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>(三)—CURP Server的<b class='flag-5'>實現(xiàn)</b>](https://file1.elecfans.com/web2/M00/A8/F5/wKgZomUiStCACVHXAABEDoVGCv441.jpeg)
分布式系統(tǒng)中Membership Change 源碼解讀
![分布式系統(tǒng)中Membership Change <b class='flag-5'>源碼</b><b class='flag-5'>解讀</b>](https://file1.elecfans.com/web2/M00/C4/42/wKgaomXqr16AORuqAAAl7hbcU5Q712.png)
評論