本文介紹了協程的概念,并討論了 Tars Cpp 協程的實現原理和源碼分析。
一、前言
Tars 是 Linux 基金會的開源項目,它是基于名字服務使用 Tars 協議的高性能 RPC 開發框架,配套一體化的運營管理平臺,并通過伸縮調度,實現運維半托管服務。Tars 集可擴展協議編解碼、高性能 RPC 通信框架、名字路由與發現、發布監控、日志統計、配置管理等于一體,通過它可以快速用微服務的方式構建自己的穩定可靠的分布式應用,并實現完整有效的服務治理。
Tars 目前支持 C++,Java,PHP,Nodejs,Go 語言,其中 TarsCpp 3.x 全面啟用對協程的支持,服務框架全面融合協程。本文基于TarsCpp-v3.0.0版本,討論了協程在TarsCpp服務框架的實現。
二、協程的介紹
2.1 什么是協程
協程的概念最早出現在Melvin Conway在1963年的論文("Design of a separable transition-diagram compiler"),協程認為是“可以暫停和恢復執行”的函數。
協程可以看成一種特殊的函數,相比于函數,協程最大的特點就是支持掛起(yield)和恢復(resume)的能力。如上圖所示:函數不能主動中斷執行流;而協程支持主動掛起,中斷執行流,并在一定時機恢復執行。
協程的作用:
降低并發編碼的復雜度,尤其是異步編程(callback hell)。
協程在用戶態中實現調度,避免了陷入內核,上下文切換開銷小。
2.2 進程、線程和協程
我們可以簡單的認為協程是用戶態的線程。協程和線程主要異同:
相同點:都可以實現上下文切換(保存和恢復執行流)
不同點:線程的上下文切換在內核實現,切換的時機由內核調度器控制。協程的上下文切換在用戶態實現,切換的時機由調用方自身控制。
進程、線程和協程的比較:
2.3 協程的分類
按控制傳遞(Control-transfer)機制分為:對稱(Symmetric)協程和非對稱(Asymmetric)協程。
對稱協程:協程之間相互獨立,調度權(CPU)可以在任意協程之間轉移。協程只有一種控制傳遞操作(yield)。對稱協程一般需要調度器支持,通過調度算法選擇下一個目標協程。
非對稱協程:協程之間存在調用關系,協程讓出的調度權只能返回給調用者。協程有兩種控制操作:恢復(resume)和掛起(yield)。
下圖演示了對稱協程的調度權轉移流程,協程只有一個操作yield,表示讓出CPU,返回給調度器。
對稱協程示意圖
下圖演示了非對稱協程的調度權轉移流程。協程可以有兩個操作,即resume和yield。resume表示轉移CPU給被調用者,yield表示被調用者返回CPU給調用者。
非對稱協程示意圖
根據協程是否有獨立的棧空間,協程分為有棧協程(stackful)和無棧協程(stackless)兩種。
有棧協程:每個協程有獨立的棧空間,保存獨立的上下文(執行棧、寄存器等),協程的喚醒和掛起就是拷貝和切換上下文。優點:協程調度可以嵌套,在內存中的任意位置、任意時刻進行。局限:協程數目增大,內存開銷增大。
無棧協程:單個線程內所有協程都共享同一個棧空間(共享棧),協程的切換就是簡單的函數調用和返回,無棧協程通常是基于狀態機或閉包來實現。優點:減小內存開銷。局限:協程調度產生的局部變量都在共享棧上, 一旦新的協程運行后共享棧中的數據就會被覆蓋, 先前協程的局部變量也就不再有效, 進而無法實現參數傳遞、嵌套調用等高級協程交互。
Golang 中的 goroutine、Lua 中的協程都是有棧協程;ES6的 await/async、Python 的 Generator、C++20 中的 cooroutine 都是無棧協程。
三、Tars 協程實現
實現協程的核心有兩點:
實現用戶態的上下文切換。
實現協程的調度。
Tars 協程的由下面幾個類實現:
TC_CoroutineInfo 協程信息類:實現協程的上下文切換。每個協程對應一個 TC_CoroutineInfo 對象,上下文切換基于boost.context實現。
TC_CoroutineScheduler 協程調度器類:實現了協程的管理和調度。
TC_Coroutine 協程類:繼承于線程類(TC_Thread),方便業務快速使用協程。
Tars 協程有幾個特點:
有棧協程。每個協程都分配了獨立的棧空間。
對稱協程。協程之間相互獨立,由調度器負責調度。
基于 epoll 實現協程調度,和網絡IO無縫結合。
3.1 用戶態上下文切換的實現方式
協程可以看成一種特殊的函數,和普通函數不同,協程函數有掛起(yield)和恢復(resume)的能力,即可以中斷自己的執行流,并且在合適的時候恢復執行流,這也稱為上下文切換的能力。
協程執行的過程,依賴兩個關鍵要素:協程棧和寄存器,協程的上下文環境其實就是寄存器和棧的狀態。實現上下文切換的核心就是實現保存并恢復當前執行環境的寄存器狀態的能力。
實現用戶態上下文切換一般有以下方式:
3.2 基于boost.context實現上下文切換
Tars 協程是基于 boost.context 實現,boost.context 提供了兩個接口(make_fcontext, jump_fcontext)實現協程的上下文切換。
代碼1:
/** * @biref 執行環境上下文 */ typedef void* fcontext_t; /** * @biref 事件參數包裝 */ struct transfer_t { fcontext_t fctx; // 來源的執行上下文。來源的上下文指的是從什么位置跳轉過來的 void* data; // 接口傳入的自定義的指針 }; /** * @biref 初始化執行環境上下文 * @param sp 棧空間地址 * @param size 棧空間的大小 * @param fn 入口函數 * @return 返回初始化完成后的執行環境上下文 */ extern "C" fcontext_t make_fcontext(void * stack, std::size_t stack_size, void (* fn)( transfer_t)); /** * @biref 跳轉到目標上下文 * @param to 目標上下文 * @param vp 目標上下文的附加參數,會設置為transfer_t里的data成員 * @return 跳轉來源 */ extern "C" transfer_t jump_fcontext(fcontext_t const to, void * vp);
(1)make_fcontext創建協程
接受三個參數,stack是為協程分配的棧底,stack_size是棧的大小,fn是協程的入口函數
返回初始化完成后的執行環境上下文
(2)jump_fcontext切換協程
接受兩個參數,目標上下文地址和參數指針
返回一個上下文,指向當前上下文從哪個上下文跳轉過來
fcontext的結構
boost context 是通過 fcontext_t結構體來保存協程狀態。相對于其它匯編實現的協程庫,boost的context和stack是一起的,棧底指針就是context,切換context就是切換stack。
3.3 Tars協程信息類
TC_CoroutineInfo 協程信息類,包裝了 boost.context 提供的接口,表示一個 TARS 協程。
其中,TC_CoroutineInfo::registerFunc 定義了協程的創建。
代碼2:
void TC_CoroutineInfo::registerFunc(const std::function& callback) { _callback = callback; _init_func.coroFunc = TC_CoroutineInfo::corotineProc; _init_func.args = this; fcontext_t ctx = make_fcontext(_stack_ctx.sp, _stack_ctx.size, TC_CoroutineInfo::corotineEntry); // 創建協程 transfer_t tf = jump_fcontext(ctx, this); // context 切換 //實際的ctx this->setCtx(tf.fctx); } void TC_CoroutineInfo::corotineEntry(transfer_t tf) { TC_CoroutineInfo * coro = static_cast< TC_CoroutineInfo * >(tf.data); // this auto func = coro->_init_func.coroFunc; void* args = coro->_init_func.args; transfer_t t = jump_fcontext(tf.fctx, NULL); //拿到自己的協程堆棧, 當前協程結束以后, 好跳轉到main coro->_scheduler->setMainCtx(t.fctx); //再跳轉到具體函數 func(args, t); }
TC_CoroutineInfo::switchCoro 定義了協程切換。
代碼3
void TC_CoroutineScheduler::switchCoro(TC_CoroutineInfo *to) { //跳轉到to協程 _currentCoro = to; transfer_t t = jump_fcontext(to->getCtx(), NULL); //并保存協程堆棧 to->setCtx(t.fctx); }
四、Tars 協程調度器
基于 boost.context 的 TC_CoroutineInfo 類實現了協程的上下文切換,協程的管理和調度,則是由 TC_CoroutineScheduler 協程調度器類來負責,分管理和調度兩個方面來說明 TC_CoroutineScheduler 調度類。
協程管理:目的是需要合理的數據結構來組織協程(TC_CoroutineInfo),方便調度的實現。
協程調度:目的是控制協程的啟動、休眠和喚醒,實現了 yield, sleep 等功能,本質就是實現協程的狀態機,完成協程的狀態切換。Tars 協程分為 5 個狀態:FREE, ACTIVE, AVAIL, INACTIVE, TIMEOUT
代碼4:
/** * 協程的狀態信息 */ enum CORO_STATUS { CORO_FREE = 0, CORO_ACTIVE = 1, CORO_AVAIL = 2, CORO_INACTIVE = 3, CORO_TIMEOUT = 4 };
4.1 Tars 協程的管理
TC_CoroutineScheduler 主要通過以下方法管理協程:
TC_CoroutineScheduler::create()
創建TC_CoroutineScheduler對象
TC_CoroutineScheduler::init()初始化,分配協程棧內存
TC_CoroutineScheduler::run()啟動調度
TC_CoroutineScheduler::terminate()停止調度
TC_CoroutineScheduler::destroy()資源銷毀,釋放協程棧內存
我們可以通過 TC_CoroutineScheduler::init()看到數據結構的初始化過程。
代碼5:
void TC_CoroutineScheduler::init() { ... .... createCoroutineInfo(_poolSize); // _all_coro = new TC_CoroutineInfo*[_poolSize+1]; TC_CoroutineInfo::CoroutineHeadInit(&_active); TC_CoroutineInfo::CoroutineHeadInit(&_avail); TC_CoroutineInfo::CoroutineHeadInit(&_inactive); TC_CoroutineInfo::CoroutineHeadInit(&_timeout); TC_CoroutineInfo::CoroutineHeadInit(&_free); int iSucc = 0; for(size_t i = 0; i < _currentSize; ++i) { //iId=0不使用, 給mainCoro使用!!!! uint32_t iId = generateId(); stack_context s_ctx = stack_traits::allocate(_stackSize); // 分配協程棧內存 TC_CoroutineInfo *coro = new TC_CoroutineInfo(this, iId, s_ctx); _all_coro[iId] = coro; TC_CoroutineInfo::CoroutineAddTail(coro, &_free); ++iSucc; } _currentSize = iSucc; _mainCoro.setUid(0); _mainCoro.setStatus(TC_CoroutineInfo::CORO_FREE); _currentCoro = &_mainCoro; }
通過下面的 TC_CoroutineScheduler 調度類數據結構圖,可以更清楚的看到協程的組織方式:
Tars調度類數據結構
使用協程之前,需要在協程數組(_all_coro),創建指定數量的協程對象,并為每個協程分配協程棧內存。
通過鏈表的方式管理協程,每個狀態都有一個鏈表。協程狀態切換,對應協程在不同狀態鏈表的轉移。
4.2Tars 協程的調度
Tars 調度是基于epoll實現,在 epoll 循環里檢查是否有需要執行的協程, 有則執行之, 沒有則等待在epoll對象上, 直到有喚醒或者超時。使用 epoll 實現的好處是可以和網絡IO無縫粘合, 當有數據發送/接收時, 喚醒epoll對象, 從而完成協程的切換。
Tars協程調度的核心邏輯是
TC_CoroutineScheduler::run()
代碼6:
void TC_CoroutineScheduler::run() { ... ... while(!_epoller->isTerminate()) { if(_activeCoroQueue.empty() && TC_CoroutineInfo::CoroutineHeadEmpty(&_avail) && TC_CoroutineInfo::CoroutineHeadEmpty(&_active)) { _epoller->done(1000); // epoll_wait(..., 1000ms) 先處理epoll的網絡事件 } //喚醒需要激活的協程 wakeup(); //喚醒sleep的協程 wakeupbytimeout(); //喚醒yield的協程 wakeupbyself(); int iLoop = 100; //執行active協程, 每次執行100個, 避免占滿cpu while(iLoop > 0 && !TC_CoroutineInfo::CoroutineHeadEmpty(&_active)) { TC_CoroutineInfo *coro = _active._next; switchCoro(coro); --iLoop; } //執行available協程, 每次執行1個 if(!TC_CoroutineInfo::CoroutineHeadEmpty(&_avail)) { TC_CoroutineInfo *coro = _avail._next; switchCoro(coro); } } ... ... }
下圖可以更清楚得看到協程調度和狀態轉移的過程。
Tars協程調度狀態轉移圖
TC_CoroutineScheduler 提供了下面四種方法實現協程的調度:
(1) TC_CoroutineScheduler:啟動協程。
(2)TC_CoroutineScheduler:當前協程放棄繼續執行。并提供了兩種方式,支持不同的喚醒策略。
yield(true):會自動喚醒(等到下次協程調度,都會再激活當前線程)
yield(false):不再自動喚醒,除非自己調度該協程(比如put到調度器中)
(3)TC_CoroutineScheduler:當前協程休眠iSleepTime時間(單位:毫秒),然后會被喚醒繼續執行。
(4)TC_CoroutineScheduler:放入需要喚醒的協程, 將協程放入到調度器中, 馬上會被調度器調度。
五、總結
本文介紹了協程的概念,并討論了 Tars Cpp 協程的實現原理和源碼分析。
審核編輯:劉清
-
RPC
+關注
關注
0文章
111瀏覽量
11540 -
C++語言
+關注
關注
0文章
147瀏覽量
7008 -
Lua語言
+關注
關注
0文章
9瀏覽量
1506 -
調度器
+關注
關注
0文章
98瀏覽量
5261
原文標題:Tars-Cpp協程實現分析
文章出處:【微信號:OSC開源社區,微信公眾號:OSC開源社區】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論