本文主要介紹了一個在 GreptimeDB 中遇到的一個關于異步取消 (async cancellation) 的“奇怪”問題[1]。
The Problem
針對這個問題,我們首先描述一個簡化的場景:在一個長時間運行的測試中存在元信息損壞的問題,有一個應該單調遞增的序列號出現了重復。
序列號的更新邏輯非常簡單:從一個原子變量中讀取當前值,然后通過異步 I/O 方法 persist_number()將新值寫入文件里,最后更新這個原子變量。整個流程都是串行化的(file 是一個獨占引用)。
asyncfnupdate_metadata(file:&mutFile,counter:AtomicU64)->Result<()>{
letnext_number=counter.load(Ordering::Relaxed)+1;
persist_number(file,next_number).await?;
counter.fetch_add(1,Ordering::Relaxed);
}
由于一些原因,我們在這里使用了 load 函數而非 fetch_add(雖然單就這里來說可以用fetch_add,并且用了還不會引發這次的問題)。當這個更新流程在中間出現錯誤時,我們不希望更新內存中的計數器。我們清楚如果persist_number() 寫入文件時失敗就能夠從 ?提前返回,并且會提早結束執行來傳播錯誤,所以編碼的時候會注意這些問題。
但是到了 .await 這里事情就變得奇妙了起來,因為 async cancellation 帶來了一個隱藏的控制流。
Async Cancellation
async task and runtime
如果這時候你已經猜到了到底是什么引發了這個問題,可以跳過這一章節。如果沒有,就讓我從一些偽代碼開始解釋在 await point 那里到底發生了什么,以及 runtime 是如何參與其中的。
-
poll_future
首先是poll_future,對應到Future的 poll[2] 方法。我們寫的異步方法都會被轉化成類似這樣子的一個匿名的Future實現。
fnpoll_future()->FutureOutput{
matchstatus_of_the_task{
Ready(output)=>{
//thetaskisfinished,andwehaveitoutput.
//somelogic
returnour_output;
},
Pending=>{
//itisnotready,wedon'thavetheoutput.
//thuswecannotmakeprogressandneedtowait
returnPending;
}
}
}
?
async塊通常包含其他的異步方法,比如update_metadata和persist_number。這里把persist_number稱為update_metadata的子異步任務。每個.await都會被展開成類似poll_future的東西,等待子任務的結果并繼續執行。在這個例子中就是等待persist_number的結果返回Ready再更新計數器,否則不更新。
[2] poll:
https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll
-
runtime
第二段偽代碼是一個簡化的 runtime,它負責輪詢 (poll) 異步任務直到它們完成(考慮到接下來的文章內容,“直到……完成”這種表述并不適合于所有情況)。在 GreptimeDB 中我們使用tokio[3] 作為 runtime。現在的異步 runtime 可能有很多特性和功能,其中最基礎的就是輪詢這些任務。
fnruntime(&self){
loop{
letfuture_tasks:Vec=self.get_tasks();
fortaskintasks{
matchtask.poll_future(){
Ready(output)=>{
//thistaskisfinished.wakeitwiththeresult
task.wake(output);
},
Pending=>{
//thistaskneedssometimetorun.pollitlater
self.poll_later(task);
}
}
}
}
}
通過結合上述兩個簡化的 future 和 runtime 模型,我們得到如下這個循環(真實的 runtime 非常復雜,這里為了內容集中省略了很多)。
fnrun()->Output{
loop{
ifletReady(result)=task.poll(){
returnresult;
}
}
}
需要強調的是,每個 .await 都代表著一個或者多個函數調用 (調用到 poll() 或者說是 poll_future() )。這就是標題中“隱藏的控制流”,以及 cancellation 發生的地方。
我們再看一段簡單的程序來探測 runtime 的行為(可以直接在 playground[4]里面運行這段代碼):
usetokio::{sleep,Duration,timeout};
#[tokio::main]
asyncfnmain(){
letf=async{
print(1).await;
println!("1isdone");
print(2).await;
println!("2isdone");
print(3).await;
println!("3isdone");
};
ifletErr(_)=timeout(Duration::from_millis(150),f).await{
println!("timeout");
}
sleep(Duration::from_millis(300)).await;
println!("exit")
}
asyncfnprint(val:u32){
sleep(Duration::from_millis(100)).await;
println!("valis{}",val);
}
只要花幾分鐘時間猜測一下上方代碼的輸出結果,如果和下面的一致,相信你已經知道問題出在哪里。
valis1
1isdone
timeout
exit
之后的語句都因為超時而被 runtime 取消執行了。
這個問題其中的原理并不復雜,但是(對我來說)能夠定位到它并不輕易。在把其他問題都排除掉之后我知道問題就發生在這里,在這個 .await 上。也許是太多次成功的異步函數調用麻痹了注意,亦或是我的心智模型中沒有把這兩點聯系起來,聯想到這點著實費了一番心思。
[3]tokio:
https://docs.rs/tokio/latest/tokio/
[4]playground:
https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=40220605392e951e833a0b45719ed1e1
cancellation
目前為止的內容是問題復盤的標準流程,接下來,讓我們來展開討論一下 cancellation,它是與 runtime 的行為相關的。
雖然 Rust 中的很多 runtime 都有類似的行為,但是這不是一個必須的特性,比如這個自己寫的runtime[5] 就不支持 cancellation。因為問題發生在 tokio 上,因此這里會以它為例,而其他的 runtime 也是類似的。在 tokio 中,可以使用 JoinHandle::abort()[6] 來取消一個 task。task 結構中有一個“cancel marker bit”來跟蹤一個任務是否被取消了。如果它發現一個 task 被取消了,就會停止執行這個 task。
//Ifthetaskisrunning,wemarkitascancelled.Thethread
//runningthetaskwillnoticethecancelledbitwhenit
//stopspollinganditwillkillthetask.
//
//Theset_notified()callisnotstrictlynecessarybutitwill
//insomecasesletawake_by_refcallreturnwithouthaving
//toperformacompare_exchange.
snapshot.set_notified();
snapshot.set_cancelled();
背后的邏輯也很簡單,就是 runtime 放棄了繼續輪詢你的 task,就和 ? 差不多。某種程度上可能更棘手一點,因為我們不能像 Err 那樣處理這個 cancellation。不過這代表我們需要考慮每一個 .await都有可能隨時被 cancel 掉嗎?這也太麻煩了。
以本文這個 metadata 更新的情況為例,如果把 cancel 納入考慮范圍,我們需要檢查文件是否和內存中的狀態一致,如果不一致就要回滾持久化的改動等等。壞消息是,在某些方面答案是肯定的,runtime 可以對你的 future 做任何事情。不過好在大多數情況下它們都還是很遵守規矩的。
[5]試驗 runtime:
https://github.com/waynexia/texn
[6] JoinHandle::abort():
https://docs.rs/tokio/latest/tokio/task/struct.JoinHandle.html#method.abort
Current Solution
explicit detach
現在是否有手段能防止 task 被取消呢?在 tokio 中我們可以通過 drop JoinHandle來 detach 一個任務到后臺。一個 detached task 意味著沒有前臺的 handle 來控制這個任務,從某種意義上來說也就使得其他人不能在外面套一層timeout或select,從而間接地使它不會被取消執行。并且開頭提到的問題就是通過這種方式解決的。
JoinHandle detaches the associated task when it is dropped, which means that there is no longer any handle to the task, and no way to joinon it.
不過雖然有辦法能夠實現這個功能,是否像 glommio[7]一樣有一個顯式的 detach 方法,類似一個不返回 JoinHandle 的 spawn 方法會更好。但這些都是瑣碎的事情,一個 runtime 通常不會完全沒有理由就取消一個 task,并且在大多數情況下都是出于用戶的要求,只不過有時候可能沒有注意到,就像 select 中的那些“未選中的分支”或者 tonic 中請求處理的邏輯那樣。所以如果我們確定一個 task 是不能被取消的話,顯式地 detach 可能能預防某些悲劇的發生。
Our solution
目前為止所有問題都清晰了,讓我們開始修復這個 bug 吧!
首先,為什么我們的 future 會被取消呢?通過函數調用鏈路很容易就能發現整個處理過程都是在tonic的請求執行邏輯中就地執行的,而對于一個網絡請求來說有一個超時行為是很常見的。解決方案也很簡單,就是將服務器處理邏輯提交到另一個 runtime 中執行,從而防止它被取消。只需要幾行代碼[8]就能完成。
@@-30,12+40,24@@implBatchHandler{
}
batch_resp.admins.push(admin_resp);
-fordb_reqinbatch_req.databases{
-forobj_exprindb_req.exprs{
-letobject_resp=self.query_handler.do_query(obj_expr).await?;
-db_resp.results.push(object_resp);
+let(tx,rx)=oneshot::channel();
+letquery_handler=self.query_handler.clone();
+let_=self.runtime.spawn(asyncmove{
+//executerequestinanotherruntimetopreventtheexecutionfrombeingcancelledunexpectedbytonicruntime.
+letmutresult=vec![];
+fordb_reqinbatch_req.databases{
+forobj_exprindb_req.exprs{
+letobject_resp=query_handler.do_query(obj_expr).await;
+
+result.push(object_resp);
+}
}
這個問題到這里就修復完了,不過并不是從根本上解決 async cancellation 帶來的 bug,而是采用間接手段去規避任務由于超時而被提前取消的問題,畢竟我們的這些異步邏輯還是需要被完整執行的。
但是這樣的處理會放大另外一些問題,比如我們也無法提前取消掉對于已經不用執行或資源消耗特別大的任務,從而導致系統資源的浪費。這些是我們之后需要持續改進的地方。接下來會就這方面繼續展開,從 async 生態的方面討論有哪些可能能提升 async cancellation 的使用體驗。
[7] glommio's:
https://docs.rs/glommio/0.7.0/glommio/struct.Task.html#method.detach
[8] 解決方案代碼:
https://github.com/GreptimeTeam/greptimedb/pull/376/files#diff-9756dcef86f5ba1d60e01e41bf73c65f72039f9aaa057ffd03f3fc2f7dadfbd0R46-R54
Runtime Behavior
-
marker trait
首先,我們自然希望 runtime 不要無條件地取消我的 task,而是嘗試通過類型系統來變得更友好,比如借助類似 CancelSafe 的 marker trait 。對于 cancellation safety 這個詞,tokio 在它的文檔[9]中有提到:
To determine whether your own methods are cancellation safe, look for the location of uses of.await. This is because when an asynchronous method is cancelled, that always happens at an.await. If your function behaves correctly even if it is restarted while waiting at an.await, then it is cancellation safe.
簡單來說就是用來描述一個 task 是否可以安全地被取消掉,這是 async task 的屬性之一。tokio 維護了一個很長的列表,列出了哪些是安全的以及哪些是不安全的。看起來這和UnwindSafe[10]這個 marker trait 很像。兩者都是描述“這種控制流程并不總是被預料到的”,并且“有可能導致一些微妙的 bug”的這樣一種屬性。
如果有這樣一個 CancelSafe 的 trait,我們就有途徑可以告訴 runtime 我們的異步任務是否可以安全地被取消掉,同時也是一種方式讓用戶承諾 cancelling 這個控制流程是被仔細處理過的。如果發現沒有實現這個 trait,那就意味著我們不希望這個 task 被取消掉,簡單而清晰。以 timeout()為例:
///Themarkertrait
traitCancelSafe{}
///Onlycancellabletaskcanbetimeout-ed
pubfntimeout(duration:Duration,future:F)->Timeoutwhere
F:Future+CancelSafe
{}
-
volunteer cancel
另一個方式是讓任務自愿地取消。就像 Kotlin 中的 cooperative cancellation[11] 一樣,它有一個 isActive 方法來檢查一個 task 是否被取消掉。這只是一個檢測方法,是否要取消完全取決于 task 本身。下面是 Kotlin 文檔中的一個例子, cooperative cancellation 發生在第 5 行。這種方式把“隱藏的控制流程”放在了明面上,讓我們能以一種更自然的方式來考慮和處理 cancellation,就像 Option 或 Result 一樣。
valstartTime=System.currentTimeMillis()
valjob=launch(Dispatchers.Default){
varnextPrintTime=startTime
vari=0
while(isActive){//cancellablecomputationloop
//printamessagetwiceasecond
if(System.currentTimeMillis()>=nextPrintTime){
println("job:I'msleeping${i++}...")
nextPrintTime+=500L
}
}
}
delay(1300L)//delayabit
println("main:I'mtiredofwaiting!")
job.cancelAndJoin()//cancelsthejobandwaitsforitscompletion
println("main:NowIcanquit.")
并且我認為這也不難實現,Tokio 現在已經有了 Cancelled bit[12] 和 CancellationToken,只是看起來和期望的還有點不一樣。最后還是需要 runtime 把 cancellation 的權利交給 task,否則情況可能沒有什么大的不同。
[9] tokio 文檔:
https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety
[10]UnwindSafe:
https://doc.rust-lang.org/std/panic/trait.UnwindSafe.html
[11]cooperative cancellation:
https://kotlinlang.org/docs/cancellation-and-timeouts.html#cancellation-is-cooperative
[12]Cancelledbit:
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/is-active.html
異步取消問題作為 Rust 語言中一個經典問題,暫時還沒有很完美的解法。如果你正被這個問題困擾,或者有一些自己的見解,歡迎在評論區留言。期待隨著社區生態的共同努力,很快能共建一個更優的解決方案~
關于 Greptime
Greptime 格睿科技于2022年創立,目前正在完善和打造時序性數據庫 GreptimeDB 和格睿云 Greptime Cloud 這兩款產品。GreptimeDB 是款用 Rust 語言編寫的時序數據庫。具有分布式,開源,云原生,兼容性強等特點,幫助企業實時讀寫,處理和分析時序數據的同時降低長期存儲的成本。
-
官網:https://greptime.com/
-
GitHub:https://github.com/GreptimeTeam/greptimedb
-
文檔:https://docs.greptime.com/
-
Twitter:https://twitter.com/Greptime
-
Slack:https://greptime.com/slack
-
LinkedIn:https://www.linkedin.com/company/greptime/
審核編輯 :李倩
-
數據庫
+關注
關注
7文章
3795瀏覽量
64364 -
函數
+關注
關注
3文章
4329瀏覽量
62575 -
Rust
+關注
關注
1文章
228瀏覽量
6601
原文標題:看不見的控制流 — Rust 異步取消的幾點思考
文章出處:【微信號:Rust語言中文社區,微信公眾號:Rust語言中文社區】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論