workflow是搜狗開源的一個(gè)開發(fā)框架。可以滿足絕大多數(shù)日常服務(wù)器開發(fā),性能優(yōu)異,給上層業(yè)務(wù)提供了易于開發(fā)的接口,卻只用了少量的代碼,舉重若輕,而且代碼整潔干凈易讀。
搜狗官方宣傳強(qiáng)調(diào),workflow是一個(gè)異步任務(wù)調(diào)度編程范式,封裝了6種異步資源:CPU計(jì)算、GPU計(jì)算、網(wǎng)絡(luò)、磁盤I/O、定時(shí)器、計(jì)數(shù)器,以回調(diào)函數(shù)模式提供給用戶使用,概括起來實(shí)際上主要是兩個(gè)功能:1、屏蔽阻塞調(diào)用的影響,使阻塞調(diào)用的開發(fā)接口變?yōu)楫惒降模浞掷糜?jì)算資源;2、框架管理線程池,使開發(fā)者迅速構(gòu)建并行計(jì)算程序。
往往單臺(tái)機(jī)器要服務(wù)于千千萬萬終端,我們最希望服務(wù)器資源都能充分利用,然而計(jì)算資源和I/O資源天然的效率不對(duì)等,使我們不得不采用一些其他技術(shù)手段實(shí)現(xiàn)基礎(chǔ)資源充分利用。所謂I/O資源包括文件I/O和網(wǎng)絡(luò)I/O,此外很多時(shí)候我們需要定時(shí)執(zhí)行某段邏輯,同樣不希望等待時(shí)間阻塞計(jì)算資源的使用。
所以框架最基礎(chǔ)的功能,是要為上層開發(fā)人員屏蔽底層資源的不對(duì)稱,使我們可以方便的開發(fā)業(yè)務(wù)邏輯而不需要把很多精力放在底層。
如何擬合計(jì)算資源和io資源
我們希望io等待或其他阻塞的時(shí)間,cpu還能充分利用,執(zhí)行一些任務(wù)。這要求發(fā)起io的線程不能調(diào)用阻塞接口原地等待,而是要切出去,往往采用I/O多路復(fù)用或者異步I/O的方式,分別對(duì)應(yīng)reactor模型和proactor模型
對(duì)于網(wǎng)絡(luò)I/O,linux系統(tǒng)下缺乏對(duì)異步I/O的支持,即使近兩年有了iouring,支持了異步io,但性能上相對(duì)epoll未必會(huì)有多少提升,而且一切都交給系統(tǒng)調(diào)度,可控性上大大降低;另外開發(fā)難度也更大。反觀epoll,無論系統(tǒng)的支持還是相關(guān)設(shè)計(jì)模型都非常成熟了,所以近一二十年底層大都采用epoll,以reactor模式實(shí)現(xiàn),reactor統(tǒng)一處理請(qǐng)求,將就緒的任務(wù)轉(zhuǎn)給下游的處理器。根據(jù)業(yè)務(wù)不同,又有幾種不同實(shí)現(xiàn)方式,有的就單線程之內(nèi)調(diào)度,單線程循環(huán)處理(如redis),適合業(yè)務(wù)邏輯不復(fù)雜的場(chǎng)景;有的會(huì)單reactor處理請(qǐng)求,并通過消息隊(duì)列把請(qǐng)求轉(zhuǎn)發(fā)給下游多線程業(yè)務(wù)邏輯處理器處理;有的多線程多reactor處理請(qǐng)求,并通過消息隊(duì)列將任務(wù)分發(fā)給下游handler,單reactor模式可以認(rèn)為是這種模式的特例,workflow便以這種方式實(shí)現(xiàn)。
對(duì)于文件I/O,linux下有兩種異步I/O的支持,posix aio(glibcaio)和linux 原生 aio,其中前者是一個(gè)通過多線程的異步,模擬的異步io,性能極差;linux 原生 aio是真正的aio,但是要求fd只能以O(shè)_DIRECT方式打開,所以只適用于文件I/O,workflow中支持了這種方式處理文件I/O。
對(duì)于定時(shí)器,常見的方式,有的通過epoll每次阻塞設(shè)置阻塞時(shí)間,用戶態(tài)管理定時(shí)器(如redis);而epoll也支持時(shí)間事件,有的直接使用時(shí)間事件,workflow便采用這種方式。
提供給用戶的接口
計(jì)算資源得以充分利用,還需要考慮給用戶提供什么樣的接口,讓上層開發(fā)者能減少心智負(fù)擔(dān),比如,以協(xié)程的方式,讓用戶像開發(fā)串行程序一樣開發(fā)異步程序,順序的寫邏輯;亦或者是提供讓用戶注冊(cè)回調(diào)的方式開發(fā)異步程序。workflow中提出了子任務(wù)的概念,以任務(wù)的方式提供給用戶。
子任務(wù)定義了一種管理回調(diào)的方式,用串行并行來組織子任務(wù)調(diào)度。用戶可以把邏輯寫在任務(wù)里,交給框架去調(diào)度。
把阻塞的任務(wù)交給epoll去異步調(diào)用,計(jì)算任務(wù)交給線程池去異步執(zhí)行,以至于所有的任務(wù)都是異步調(diào)起的,這種設(shè)計(jì)思想,就是workflow被稱為“異步任務(wù)調(diào)度框架”的原因。
代碼分析
根據(jù)上面的分析,對(duì)一般服務(wù)器框架結(jié)構(gòu)已經(jīng)有了一個(gè)整體認(rèn)識(shí)。下面按這個(gè)順序,底層基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)——》純計(jì)算任務(wù)和Reactor層——》任務(wù)組織調(diào)度層——》用戶接口層,分四個(gè)層次逐步分析一下workflow的實(shí)現(xiàn)。
基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)
workflow使用到的基礎(chǔ)數(shù)據(jù)結(jié)構(gòu):鏈表、紅黑樹、消息隊(duì)列、線程池,workflow中這四個(gè)結(jié)構(gòu)的設(shè)計(jì)都非常的精致。
鏈表(見文件 list.h)
workflow中的鏈表貌似引自linux內(nèi)核,實(shí)現(xiàn)了一種非常非常靈活的鏈表,甚至鏈表串起的不同節(jié)點(diǎn)之間可以是不同的數(shù)據(jù)結(jié)構(gòu)
一般來說一個(gè)普通的鏈表節(jié)點(diǎn)如下:
{
ListNode * prev_ = nullptr;
ListNode * next_ = nullptr;
void * p_value_ = nullptr;
};
定義節(jié)點(diǎn)時(shí)定義好數(shù)據(jù)段p_value_,這樣的話數(shù)據(jù)結(jié)構(gòu)的實(shí)現(xiàn)就會(huì)與業(yè)務(wù)邏輯結(jié)合在一起。
這里不使用模板也實(shí)現(xiàn)了預(yù)定義獨(dú)立于業(yè)務(wù)邏輯的鏈表數(shù)據(jù)結(jié)構(gòu)。
鏈表的節(jié)點(diǎn):// 這是一個(gè)雙鏈表
list_head *next, *prev;
};
可以把鏈表嵌入到任何一個(gè)數(shù)據(jù)結(jié)構(gòu)中,
那如何通過鏈表節(jié)點(diǎn)拿到當(dāng)前所在結(jié)構(gòu)呢?
通過一個(gè)宏來實(shí)現(xiàn):
((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))
簡(jiǎn)單解釋下這個(gè)宏:ptr表示鏈表節(jié)點(diǎn)指針,type是當(dāng)前節(jié)點(diǎn)數(shù)據(jù)結(jié)構(gòu)類型名,member是鏈表節(jié)點(diǎn)在數(shù)據(jù)結(jié)構(gòu)中的成員名
&((type *)0)->member)把指向地址空間起點(diǎn)的指針(空指針)轉(zhuǎn)化成指向節(jié)點(diǎn)數(shù)據(jù)結(jié)構(gòu)的指針,然后取鏈表節(jié)點(diǎn)成員名,再取地址,就可以取到鏈表節(jié)點(diǎn)在這個(gè)數(shù)據(jù)結(jié)構(gòu)中的偏移量。
ptr是鏈表節(jié)點(diǎn)指針,按(char *)減去偏移量,就可以回退到結(jié)構(gòu)起始位置。再把這個(gè)位置轉(zhuǎn)化成(type *).就取到了指向當(dāng)前數(shù)據(jù)結(jié)構(gòu)的指針。
看接口甚至可以發(fā)現(xiàn),當(dāng)我想把當(dāng)前數(shù)據(jù)結(jié)構(gòu)從鏈表里刪除的時(shí)候,甚至不需要拿到鏈表,而是直接通過list_del(list_head * current_node)函數(shù)傳入當(dāng)前節(jié)點(diǎn)就可以刪除,靈活的一塌糊涂。
并且提供了遍歷鏈表的接口宏:
for (pos = (head)->next; pos != (head); pos = pos->next)
每一行代碼都極其簡(jiǎn)潔干凈,妙到毫巔!
其他鏈表基礎(chǔ)知識(shí)不多贅述。
紅黑樹(見rbtree.h/.c)
與鏈表類似,紅黑樹也使用了內(nèi)核紅黑樹。
相同的風(fēng)格,每個(gè)節(jié)點(diǎn)只有鏈接指針和節(jié)點(diǎn)顏色字段,而沒有數(shù)據(jù)。
{
struct rb_node *rb_parent;
struct rb_node *rb_right;
struct rb_node *rb_left;
char rb_color;
#define RB_RED 0
#define RB_BLACK 1
};
當(dāng)把紅黑樹node嵌入數(shù)據(jù)結(jié)構(gòu)中之后,使用同樣原理的宏,來獲取節(jié)點(diǎn)所在結(jié)構(gòu)的指針:
((type *)((char *)(ptr)-(unsigned long)(&((type *)0)->member)))
比較特別的是,由于節(jié)點(diǎn)不包含數(shù)據(jù),數(shù)據(jù)結(jié)構(gòu)不知道節(jié)點(diǎn)之間如何比較大小,所以需要用戶自己定義查找、插入函數(shù),但給出了例子。
消息隊(duì)列(見msgqueue.h/.c)
這里實(shí)現(xiàn)了一個(gè)消息隊(duì)列,也是正常的提供一個(gè)put接口,供生產(chǎn)者reactor生產(chǎn)數(shù)據(jù)插入消息,一個(gè)get接口,傳遞給下游handler消費(fèi),消息隊(duì)列有消息上限,并提供阻塞和非阻塞兩種模式,阻塞模式下,當(dāng)消息超過上限生產(chǎn)線成阻塞,等待消息小于上限了再插入。通過條件變量使沒有待處理的消息時(shí),阻塞消費(fèi)線程,于內(nèi)核態(tài)等待消息出現(xiàn)。這里的生產(chǎn)者和消費(fèi)者都是多線程的,所以需要考慮線程安全,消息隊(duì)列的常見實(shí)現(xiàn)是一個(gè)數(shù)據(jù)存儲(chǔ)段,一個(gè)鎖,一個(gè)條件變量,而workflow中的消息隊(duì)列的高妙之處就在于,他有兩個(gè)鎖,兩個(gè)條件變量,兩個(gè)數(shù)據(jù)空間,雙倍快樂。
{
size_t msg_max;
size_t msg_cnt;
int linkoff;
int nonblock;
void *head1;
void *head2;
void **get_head;
void **put_head;
void **put_tail;
pthread_mutex_t get_mutex;
pthread_mutex_t put_mutex;
pthread_cond_t get_cond;
pthread_cond_t put_cond;
};
這里使用了一個(gè)小技巧,大幅提升消息隊(duì)列性能,兩個(gè)數(shù)據(jù)段一個(gè)專門用來get,一個(gè)專門用來put,兩把鎖兩個(gè)條件變量,分別put時(shí)候和get時(shí)候使用。這樣的好處就是get和put操作之間幾乎互不干擾。put操作不會(huì)鎖消費(fèi)線程。get操作絕大多數(shù)情況下不會(huì)鎖生產(chǎn)線程。
只有當(dāng)get鏈表為空時(shí),才會(huì)把put和get全鎖住,對(duì)兩個(gè)鏈表頭進(jìn)行交換,極大的減少了生產(chǎn)線程和消費(fèi)線程之間爭(zhēng)奪鎖產(chǎn)生的相互影響。
這里還有一個(gè)點(diǎn)就是消息隊(duì)列要求節(jié)點(diǎn)是自帶鏈表字段的,并指定鏈接節(jié)點(diǎn)相對(duì)于結(jié)構(gòu)頭的偏移量(linkoff)。所以插進(jìn)來的節(jié)點(diǎn)msg的結(jié)構(gòu)是poller_result但是實(shí)際結(jié)構(gòu)是poller_node強(qiáng)轉(zhuǎn)過來的,再對(duì)比這兩個(gè)結(jié)構(gòu)體,發(fā)現(xiàn)前三個(gè)成員是一致的,而第四個(gè)成員就是鏈接節(jié)點(diǎn)。
{
int state;
int error;
struct poller_data data;
};
struct __poller_node
{
int state;
int error;
struct poller_data data;
#pragma pack(1)
union
{
struct list_head list;
struct rb_node rb;
};
#pragma pack()
...
};
線程池(見thrdpool.h/.c)
線程池實(shí)現(xiàn)的功能往往是創(chuàng)建一系列工作線程,工作線程執(zhí)行線程回調(diào)函數(shù),從消息隊(duì)列中取任務(wù)并執(zhí)行,當(dāng)消息隊(duì)列中沒有任務(wù)時(shí),等待任務(wù)出現(xiàn)。
workflow中的線程池就是這樣一個(gè)很標(biāo)準(zhǔn)的線程池,同時(shí)很靈活的讓邏輯脫離于線程池,線程回調(diào)函數(shù)并非實(shí)際要執(zhí)行的邏輯,而是從消息隊(duì)列里get出的task,是一個(gè)包含了要執(zhí)行的回調(diào)和上下文的task,線程回調(diào)函數(shù)執(zhí)行了這個(gè)task。
{
void (*routine)(void *);
void *context;
};
這樣實(shí)現(xiàn)一個(gè)效果,就是可以運(yùn)行時(shí)才動(dòng)態(tài)決定要執(zhí)行什么邏輯,即每個(gè)task可以是不同的任務(wù),靈活度大大提升。
基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)主要就這四種,這里只分析了其設(shè)計(jì)中比較可圈可點(diǎn)的部分,而沒有仔細(xì)講一些簡(jiǎn)單的基礎(chǔ)細(xì)節(jié)。
純計(jì)算任務(wù)和Reactor調(diào)度層
把阻塞的任務(wù)交給epoll去異步調(diào)用,計(jì)算任務(wù)交給線程池去異步執(zhí)行,實(shí)現(xiàn)所有任務(wù)的異步調(diào)度,下面分別看看計(jì)算任務(wù)和reactor。
純計(jì)算任務(wù)
WorkFlow由框架統(tǒng)一管理原始任務(wù)線程池,單例__ExecManager內(nèi)有一個(gè)單的封裝,優(yōu)雅的實(shí)現(xiàn)對(duì)線程池的管理。
這一層有三個(gè)新概念:
ExecQueue是一個(gè)有鎖鏈表隊(duì)列;
ExecSession的execute()接口由派生出來的任務(wù)自己去定義需要執(zhí)行的邏輯。
Executor類,創(chuàng)建并管理線程池,提供request()方法,request方法把對(duì)應(yīng)任務(wù)放入到線程池去執(zhí)行。request的參數(shù)有兩個(gè),分別是當(dāng)前session和所在的ExecQueue,如果queue里面只有這一個(gè)session,則把這個(gè)session放入Executor管理的線程池里里執(zhí)行,如果不是首個(gè)任務(wù),則只要放入隊(duì)列里就行了,線程routine會(huì)調(diào)度當(dāng)前隊(duì)列中所有的任務(wù)進(jìn)入線程池執(zhí)行,并用ExecQueue中的鎖保持隊(duì)列中任務(wù)調(diào)度的同步性。
Executor::executor_thread_routine是線程執(zhí)行routine,一共做了兩件事:
第一步會(huì)遞歸的調(diào)度所有當(dāng)前Queue中的任務(wù)進(jìn)線程池,并用ExecQueue中的鎖保持隊(duì)列中任務(wù)調(diào)度的同步性;
第二步是執(zhí)行當(dāng)前session,并由session自己保持?jǐn)?shù)據(jù)同步。
Reactor:
這里主要涉及四個(gè)文件poller.h/.c mpoller.h/.c Communicator.h/.cc CommScheduler.h/.cc
其中poller是對(duì)epoll的封裝,mpoller又集成多個(gè)poller線程;Communicator顧名思義,就是通信器,封裝了mpoller和線程池;CommScheduler是對(duì)Communicator的封裝,全局唯一,最后創(chuàng)建在__CommManager中,通過WFGlobal暴露出來。
這一層主要完成了右圖所示的工作,poller線程把epoll事件做初加工處理,生成一個(gè)poller_result,設(shè)置需要handle的類型,然后把處理結(jié)果put()進(jìn)消息隊(duì)列,給工作線程去處理。handler線程等待任務(wù),當(dāng)隊(duì)列里有任務(wù)時(shí),根據(jù)任務(wù)的operation類型做相應(yīng)處理。
poller
poller.h/.c提供了poller的創(chuàng)建、啟動(dòng)、stop、poller_add、poller_del、poller_mod和add_timer的接口。
poller_create創(chuàng)建了poller數(shù)據(jù)結(jié)構(gòu),分配了poller_node的指針數(shù)組nodes,這里的nodes是一個(gè)以fd為下標(biāo)的數(shù)組,這時(shí)候只有一個(gè)指針數(shù)組,node還沒有創(chuàng)建,node是在poller_add的時(shí)候創(chuàng)建的,創(chuàng)建node的時(shí)候會(huì)檢查監(jiān)聽的操作是否需要result,需要的話同時(shí)分配result空間。但這時(shí)候poller線程還沒有跑起來,執(zhí)行poller_start時(shí)將poller線程跑起來;poller_add、poller_del、poller_mod分別是epoll的增加節(jié)點(diǎn)、刪除節(jié)點(diǎn)、改變監(jiān)聽事件 三種操作的簡(jiǎn)單封裝;add_timer增加時(shí)間事件,
前面說過消息隊(duì)列里面裝的是poller_result(poller_node),poller_result里面都會(huì)有一個(gè)poller_data。
#define PD_OP_WRITE 2
#define PD_OP_LISTEN 3
#define PD_OP_CONNECT 4
#define PD_OP_SSL_READ PD_OP_READ
#define PD_OP_SSL_WRITE PD_OP_WRITE
#define PD_OP_SSL_ACCEPT 5
#define PD_OP_SSL_CONNECT 6
#define PD_OP_SSL_SHUTDOWN 7
#define PD_OP_EVENT 8
#define PD_OP_NOTIFY 9
#define PD_OP_TIMER 10
struct poller_data
{
short operation;
unsigned short iovcnt;
int fd;
union
{
SSL *ssl;
void *(*accept)(const struct sockaddr *, socklen_t, int, void *);
void *(*event)(void *);
void *(*notify)(void *, void *);
};
void *context;//CommService或CommConnEntry
union
{
poller_message_t *message;
struct iovec *write_iov;
void *result;
};
};
poller_data封裝了需要處理的fd、對(duì)應(yīng)的操作(operation)、上下文(可能是CommService或CommConnEntry)。
poller的核心是poller_thread,poller_start的時(shí)候啟動(dòng)了是一個(gè)poller_thread,poller_thread處理的是epoll_event,主流程是一個(gè)經(jīng)典的雙循環(huán),外層循環(huán)epoll_wait,每次最多處理256個(gè)fd,epoll返回后,再根據(jù)每個(gè)epoll_event事件的類型,循環(huán)處理每個(gè)類型的事件,從枚舉可以看到對(duì)當(dāng)前node的操作有讀、寫、listen、connect、timer等等,不管是什么類型的epoll事件,poller_thread處理的結(jié)果會(huì)生成一個(gè).poller_result,并把這個(gè)結(jié)果插入到消息隊(duì)列中。
具體的操作非常的多了,不適合靜態(tài)分析,后面再動(dòng)態(tài)分析請(qǐng)求的全流程。
poller的操作都是線程安全的,mpoller啟動(dòng)多個(gè)線程的時(shí)候也可以直接使用。
mpoller
可以看到實(shí)際上使用的并不是poller而是mpoller,mpoller是對(duì)多線程poller的封裝,一個(gè)mpoller包括至少一個(gè)poller,實(shí)際配幾個(gè)線程就創(chuàng)建幾個(gè)poller,并統(tǒng)一分配poller_node,所有poller共享poller_node數(shù)組。實(shí)際使用的時(shí)候可以根據(jù)運(yùn)算核心數(shù)和業(yè)務(wù)邏輯的復(fù)雜程度調(diào)整poller_thread和handler_thread的配比。mpoller的add、del、mod接口會(huì)對(duì)傳入的fd對(duì)線程數(shù)求模,將fd均勻的分配到各個(gè)poller。
關(guān)于數(shù)據(jù)同步
可以看到對(duì)fd的[]操作并沒有加鎖,以mpoller_add為例
mpoller_t *mpoller)
{
unsigned int index = (unsigned int)data->fd % mpoller->nthreads;
return poller_add(data, timeout, mpoller->poller[index]);
}
第4行計(jì)算index,fd和nthreads都是不會(huì)發(fā)生變化,不會(huì)修改的,線程之間無沖突,所以不需要加鎖。
第5行由poller_add來保證線程安全,每個(gè)poller中都有一個(gè)鎖,poller_add、poller_del、poller_mod的操作都是加鎖的,因?yàn)檫@三種操作都可能發(fā)生在不同的線程。
Communicator
Communicator是通訊器,是底層和業(yè)務(wù)層的樞紐,創(chuàng)建了mpoller和handler線程池,初始化時(shí)候啟動(dòng)兩個(gè)線程池,bind的時(shí)候會(huì)把服務(wù)綁到communicator上,把服務(wù)創(chuàng)建的listen_fd放入到poller中開始監(jiān)聽。handler_thread就是在Communicator中啟動(dòng)的,handler_thread從消息隊(duì)列里拿到的是poller_result,handler_thread做的是拿到任務(wù)以后根據(jù)poller_result::poller_data::operation類型做相應(yīng)處理。
相關(guān)的結(jié)構(gòu)有:
鏈接:
class WFConnection : public CommConnection 創(chuàng)建的鏈接
對(duì)端:
CommTarget通訊目標(biāo),封裝了對(duì)端的地址、port、超時(shí)時(shí)間
消息:
{
int (*append)(const void *, size_t *, poller_message_t *);
char data[0]; // 柔性數(shù)組
};
class CommMessageIn : private __poller_message
{
private:
virtual int append(const void *buf, size_t *size) = 0;
struct CommConnEntry *entry;
};
class CommMessageOut
{
private:
virtual int encode(struct iovec vectors[], int max) = 0;
};
很明顯CommMessageIn是一次通信中的輸入消息,CommMessageOut是返回的消息的基類,輸入消息的基類是__poller_message,這里又使用了一個(gè)c程序員常用的小技巧,成員char data[0]是一個(gè)柔性數(shù)組,把__poller_message變成了一個(gè)變長結(jié)構(gòu)體。
結(jié)構(gòu)體中末尾成員是一個(gè)長度為0的char數(shù)組,這樣聲明看起來和char *data是一樣的,但是這樣寫相對(duì)于char指針有一些優(yōu)勢(shì)。
對(duì)比如下結(jié)構(gòu),考慮__poller_message_test和__poller_message有什么區(qū)別
{
int (*append)(const void *, size_t *, poller_message_t *);
char *data; // char指針
};
首先,數(shù)組長度是0,說明沒分配空間。所以64位系統(tǒng)中,sizeof(struct __poller_message_test) == 16 而 sizeof(struct __poller_message) == 8。其次,如果使用一個(gè)char指針,需要為指針分配內(nèi)存。而使用data[0]則不需要二次給指針分配內(nèi)存,直接為結(jié)構(gòu)分配適量大小內(nèi)存即可,成員data會(huì)自動(dòng)指向結(jié)構(gòu)尾部的下一個(gè)字節(jié)。
輸入消息有一個(gè)append的虛方法,子類自己去定義如何反序列化,輸出消息有一個(gè)encode的虛方法,子類消息自己去定義序列化發(fā)送消息。基類__poller_message中的函數(shù)指針會(huì)被賦值為Communicator::append(const void *buf, size_t *size, poller_message_t *msg),實(shí)際運(yùn)行時(shí)由函數(shù)指針append去調(diào)用各子類消息的virtual int append(const void *buf, size_t *size)對(duì)消息進(jìn)行反序列化。
框架內(nèi)已經(jīng)定義好一些常用協(xié)議了:
會(huì)話:CommSession
CommSession封裝了一次會(huì)話所有組成單位,包括輸入/輸出消息、CommConnection、CommTarget
定義了消息的生產(chǎn)方式
服務(wù)器:CommService
類圖:
class WFServerBase : protected CommService 服務(wù)器的抽象。封裝了服務(wù)器地址、監(jiān)聽套接字、活躍鏈接和連接數(shù)、服務(wù)器參數(shù)。
基類定義了newsession、newconnect接口。WFServerBase類中實(shí)現(xiàn)了服務(wù)啟動(dòng)start()、停止stop()、創(chuàng)建/刪除鏈接newconnect()。
WFServer是一個(gè)模板類,模板參數(shù)是輸入輸出消息類型,可以實(shí)例化為各種類型的服務(wù)器,不同類型的服務(wù)器就是消息類型不同的服務(wù)器實(shí)例化,因?yàn)椴煌愋头?wù)器實(shí)例消息類型不同,處理消息方式也不同,WFServer中保存了處理消息方式的回調(diào)——processer,并在服務(wù)創(chuàng)建的時(shí)候初始化。在WFServer中定義session創(chuàng)建方式new_session()的時(shí)候,會(huì)用processer來創(chuàng)建task,process實(shí)際上是task的處理方式。
服務(wù)Start()的時(shí)候會(huì)被bind()到全局的Communicator上,包括創(chuàng)建fd、bind、listen、放入epoll監(jiān)聽,成為epoll監(jiān)聽的第一個(gè)fd。服務(wù)實(shí)際上是交給Communicator創(chuàng)建的handler_thread線程池來驅(qū)動(dòng)起來的。
Entry:CommConnEntry
打包了所有一次會(huì)話需要的上下文,包括poller、servide、session、target、socket等,處理accept事件(handle_listen_result)的時(shí)候由Communicator::accept_conn創(chuàng)建,創(chuàng)建后放在poller_data中,mpoller_add監(jiān)聽
Communicator:
有了上面這些基礎(chǔ)結(jié)構(gòu),Communicator就是一個(gè)完全體了,Communicator初始化的時(shí)候,啟動(dòng)了poller_thread、handler_thread驅(qū)動(dòng)服務(wù)進(jìn)行消息處理。
以示例代碼的hello_world程序?yàn)槔^察一次網(wǎng)絡(luò)請(qǐng)求過程,看看poller_thread和handler_thread分別都做了什么。
從hello_world啟服到線程工作:
這里特別看一下poller_add的時(shí)候創(chuàng)建了poller_node實(shí)體,poller_node中有一個(gè)成員struct __poller_node *res,__poller_data_get_event()的時(shí)候會(huì)返回一個(gè)bool值,表示是否需要?jiǎng)?chuàng)建res。可以看到操作類型為listen的情況。是需要res的。
經(jīng)過這個(gè)過程,服務(wù)器就啟動(dòng)開始接受請(qǐng)求了,service創(chuàng)建listen_fd交由poller管理,當(dāng)監(jiān)聽到有客戶端鏈接時(shí),accept+read。下面分析接收到一個(gè)請(qǐng)求時(shí),poller_thread和handler_thread分別做了什么。
poller_thread知道listenfd可讀,則accept一個(gè)readfd,創(chuàng)建了對(duì)端target,把這個(gè)poller_result(poller_node)放進(jìn)消息隊(duì)列。
handler_thread拿到這個(gè)poller_result之后,主要是創(chuàng)建了完整的CommConnEntry,并把負(fù)責(zé)read的poller_node放入epoll監(jiān)聽,等待內(nèi)核緩沖區(qū)有數(shù)據(jù)可讀。
這里有個(gè)細(xì)節(jié),readfd是無阻塞模式,因?yàn)槭褂昧薳poll的邊緣觸發(fā)模式,即每個(gè)fd的狀態(tài)變化只通知一次,這樣的話需要把readfd上的數(shù)據(jù)全讀完,所以readfd必須設(shè)置成無阻塞模式,否則循環(huán)讀到最后肯定會(huì)被阻塞。
如果遇到errorno==EAGAIN則直接返回,因?yàn)閷?duì)于fd阻塞調(diào)用eagain表示提示重試,對(duì)于非阻塞fd,errorno==EAGAIN則表示緩沖區(qū)已經(jīng)寫滿,直接return本次處理結(jié)束。
readfd放入epoll之后,readfd上有數(shù)據(jù)到來后會(huì)被操作系統(tǒng)拷進(jìn)內(nèi)核緩沖區(qū),然后epoll提示readfd可讀。poller_thread會(huì)進(jìn)入處理可讀事件(handle_read)。
poller_thread對(duì)可讀事件的處理主要是把字節(jié)流讀出來,并反序列化,放入隊(duì)列提供給handler_thread,handler_thread調(diào)service處理業(yè)務(wù)邏輯。
handler對(duì)收到的消息的處理分兩種情況,如果是服務(wù)端,當(dāng)做請(qǐng)求處理,如果是客戶端,當(dāng)回復(fù)處理,所以hello_world程序進(jìn)入請(qǐng)求處理流程。
服務(wù)器對(duì)請(qǐng)求的處理是創(chuàng)建服務(wù)對(duì)應(yīng)類型的CommRequest,helloworld中實(shí)際是執(zhí)行了一個(gè)WFHttpServerTask。
繼承關(guān)系:WFHttpServerTask——>WFServerTask——>WFNetworkTask——>CommRequest——>SubTask,CommSession。
SubTask和CommSession后面再仔細(xì)分析,這里先從字面理解,SubTask就是任務(wù),就是處理自定義邏輯的過程,CommSession是會(huì)話。那handle的時(shí)候會(huì)先調(diào)用當(dāng)前Task的processor.dispatch()執(zhí)行任務(wù),任務(wù)執(zhí)行完自動(dòng)subtask_done()的時(shí)候會(huì)調(diào)用scheduler->reply(),將結(jié)果返回 Send_message()。可以看到Send_message是先嘗試同步寫,如果同步寫失敗了,再嘗試異步寫,異步寫的過程就是先把文件描述符加入epoll監(jiān)聽,等待可寫信號(hào)出現(xiàn)后,再寫入。寫的時(shí)候使用iovec,聚集寫盡量減少拷貝次數(shù)。
至此poller事件各種operation的處理,已經(jīng)分析過PD_OP_READ、PD_OP_WRITE、PD_OP_LISTEN,再通過wget看一下PD_OP_CONNECT。
connect主要是處理客戶端鏈接服務(wù)端時(shí),服務(wù)端無法立刻建立鏈接時(shí)的等待,異步等待屏蔽等待時(shí)間。
request的時(shí)候會(huì)優(yōu)先檢查目標(biāo)上有沒有idle鏈接,如果有的話直接復(fù)用,如果沒有會(huì)創(chuàng)建connect,conn_fd是非阻塞的,operation設(shè)置為PD_OP_CONNECT,放在epoll中管理,等待fd可用。
可以看到,是一個(gè)簡(jiǎn)單的發(fā)送請(qǐng)求,等待結(jié)果的過程。
poller事件共有10種operation,這里分析過讀、寫、connect、listen四種流程,PD_OP_SSL_ACCEPT、PD_OP_SSL_CONNECT、PD_OP_SSL_SHUTDOWN三個(gè)只是使用openssl庫時(shí)的創(chuàng)建和關(guān)閉鏈接。還有另外兩種事件:PD_OP_EVENT、PD_OP_NOTIFY,這兩種分別是linux和mac環(huán)境下處理異步文件I/O用的。
異步文件I/O:
TODO
任務(wù)組織調(diào)度層
下面分析任務(wù)線程是如何執(zhí)行任務(wù)的邏輯。這個(gè)層次有兩個(gè)核心基礎(chǔ)概念,一個(gè)是任務(wù)的抽象,一個(gè)是會(huì)話(session)的抽象,二者是所有執(zhí)行邏輯的祖爺爺和祖奶奶。
任務(wù):
前面看到對(duì)于請(qǐng)求的處理,實(shí)際是執(zhí)行了CommRequest,CommRequest既是一個(gè)SubTask又是一個(gè)CommSession,最后是通過執(zhí)行的是SubTask的接口dispatch()執(zhí)行起來的,這里最重要的概念——子任務(wù)。workflow里面所有的邏輯,最后都是通過子任務(wù)執(zhí)行起來的;子任務(wù)又可以通過各種組合關(guān)系,串并聯(lián)的組織起來。
這里有四個(gè)重要的基本元素:
1,SubTask——子任務(wù),是一切任務(wù)的祖先。
2、ParallelTask——并行任務(wù),并行任務(wù)里面管理SubTask數(shù)組,啟動(dòng)時(shí)會(huì)把自己管理的SubTask一個(gè)一個(gè)全部dispatch一遍。
3、SeriesWork——串聯(lián)工作組,里面管理了一個(gè)數(shù)組的子任務(wù),逐個(gè)執(zhí)行。
4、ParallelWork——并聯(lián)工作組,里面管理了一個(gè)SeriesWork數(shù)組,其本身的祖先是一個(gè)SubTask,所以他可以被SeriesWork管理。
這樣就實(shí)現(xiàn)了任務(wù)的串并聯(lián)執(zhí)行甚至以DAG的形式復(fù)合。
下面逐一分析:
class SubTask{
public:
virtual void dispatch() = 0;
private:
virtual SubTask *done() = 0;
protected:
void subtask_done();
private:
ParallelTask *parent;
SubTask **entry;
void *pointer;
};
SubTask是一切執(zhí)行任務(wù)的祖先,不同的任務(wù)實(shí)現(xiàn),實(shí)現(xiàn)不同的dispatch()和done()接口,提供兩個(gè)接口留給用戶自定義:
1、dispatch()接口 就是執(zhí)行任務(wù),用戶任務(wù)自定義執(zhí)行邏輯,而在執(zhí)行結(jié)束后,必須調(diào)用subtask_done()。
2、done()接口 在任務(wù)邏輯執(zhí)行結(jié)束后,由subtask_done()調(diào)起done(),這個(gè)接口是用戶自定義的結(jié)束回調(diào),在done()接口里面回收資源,銷毀任務(wù)。done()函數(shù)還會(huì)返回一個(gè)子任務(wù)的指針,當(dāng)當(dāng)前任務(wù)執(zhí)行完還要執(zhí)行下一個(gè)任務(wù)的時(shí)候,返回下一個(gè)任務(wù),如果沒有下一個(gè)任務(wù),則返回nullptr。為什么這么約定呢?這需要看一下subtask_done()函數(shù)的工作方式。
需要知道成員變量的意思才能明白調(diào)度方式:
pointer 一般指向當(dāng)前所在SeriesWork,SubWork最后也是放在SeriesWork之中啟動(dòng)起來的;
parent 當(dāng)一個(gè)子任務(wù)被ParallelTask任務(wù)管理的時(shí)候,parent指向被管理的并行任務(wù)。
entry 指向待執(zhí)行任務(wù)數(shù)組的首位。
subtask_done():仔細(xì)解讀一下subtask_done()的工作方式:
{
SubTask *cur = this;
ParallelTask *parent;
SubTask **entry;
while (1){
parent = cur->parent;
entry = cur->entry;
cur = cur->done();
if (cur){
cur->parent = parent;
cur->entry = entry;
if (parent)
*entry = cur;
cur->dispatch();
}
else if (parent) {
if (__sync_sub_and_fetch(&parent->nleft, 1) == 0) {
cur = parent;
continue;
}
}
break;
}
}
可以看到先保存了當(dāng)前任務(wù)的parent和entry,然后直接調(diào)用了當(dāng)前任務(wù)的done()接口。如果又返回了一個(gè)子任務(wù),則調(diào)用新任務(wù)的dispatch(),使其運(yùn)行起來,dispatch()到最后必然又會(huì)調(diào)用新任務(wù)的subtask_done();從而遞歸執(zhí)行這條線上所有任務(wù),直至done()不會(huì)再返回任務(wù);當(dāng)不再返回任務(wù)時(shí),說明parent的孩子都執(zhí)行完,就可以繼續(xù)再往上執(zhí)行(parent也是一個(gè)SubTask),直至根任務(wù)執(zhí)行完。
ParallelTask:
ParallelTask是SubTask的兒子,結(jié)構(gòu)很簡(jiǎn)單,管理了一個(gè)SubTask數(shù)組,ParallelTask::dispatch()的時(shí)候會(huì)把數(shù)組內(nèi)管理的所有SubTask逐一dispatch()一遍,這樣的話就實(shí)現(xiàn)了同級(jí)任務(wù)的并列執(zhí)行,特別注意并列執(zhí)行不一定是并行,是否并行取決于調(diào)度。任務(wù)本身是順序dispatch()的,如果dispatch調(diào)度的時(shí)候把任務(wù)放入線程池執(zhí)行任務(wù)就是并行的。
SeriesWork:
SeriesWork是一個(gè)有鎖的線程安全隊(duì)列,隊(duì)列中存儲(chǔ)了需要按順序執(zhí)行的SubTask,預(yù)分配4個(gè)空間,如果入隊(duì)時(shí)隊(duì)列已滿,則像vector一樣拓展二倍空間。
SubTask都是放到SeriesWork中執(zhí)行的。SeriesWork是怎么調(diào)度執(zhí)行任務(wù)的?啟動(dòng)函數(shù)Start(),會(huì)從第一個(gè)SubTask開始dispatch(),可以看到多數(shù)任務(wù)Task的done()的實(shí)現(xiàn)都是返回return series->pop();意思就是當(dāng)前任務(wù)執(zhí)行完了,返回當(dāng)前所在的SeriesWork中的下一個(gè)任務(wù),繼續(xù)執(zhí)行,直至所有任務(wù)執(zhí)行完。
注意SeriesWork本身不是一個(gè)SubTask,所以無法被SeriesWork管理。
ParallelWork:
ParallelWork稍微復(fù)雜一點(diǎn)
繼承關(guān)系:ParallelWork——>ParallelTask——>SubTask
可見:1、ParallelWork是一個(gè)SubTask,所以可以被SeriesWork管理;2、ParallelWork同時(shí)也是一個(gè)ParallelTask,管理了一個(gè)數(shù)組的SubTask;3、ParallelWork管理了一個(gè)SeriesWork數(shù)組,這個(gè)數(shù)組的長度和SubTask數(shù)組的長度相同。并且讓SubTask指向同索引SeriesWork的首個(gè)SubTask。
ParallelWork是怎樣啟動(dòng)和調(diào)度任務(wù)的:
ParallelWork本身是一個(gè)SubTask,所以啟動(dòng)時(shí)把他放入一個(gè)SeriesWork,作為SeriesWork的firsttask被調(diào)起dispatch();然后ParallelWork本身是一個(gè)ParallelTask,dispatch的時(shí)候會(huì)把其下管理的所有的SubTask逐個(gè)啟動(dòng)dispatch();如圖,SubTask指向的實(shí)際是管理的SeriesWork的first Task,所以實(shí)際上相當(dāng)于啟動(dòng)了管理的所有SeriesWork。
這四個(gè)結(jié)構(gòu)就是整個(gè)任務(wù)調(diào)度的基石,所有的邏輯都是作為任務(wù)執(zhí)行起來的。并行任務(wù)管理串行任務(wù),串行任務(wù)管理SubTask(并行任務(wù)也是SubTask),這套設(shè)定使任務(wù)可以自由復(fù)合DAG復(fù)合。
這時(shí)可以明白這個(gè)框架名字所謂WorkFlow,其核心就是組織任務(wù)的執(zhí)行流,所有的執(zhí)行邏輯都是任務(wù)。
會(huì)話(session):
想要執(zhí)行的邏輯,通過成為SubTask可以啟動(dòng)起來,并按一定的順序調(diào)度,那具體做的事,則被抽象為會(huì)話。
基礎(chǔ)session有四種:CommSession、ExecSession、IOSession、SleepSession,分別代表網(wǎng)絡(luò)操作、運(yùn)算操作、I/O操作、睡眠操作,session都需要實(shí)現(xiàn)handle()接口,所有最后執(zhí)行的任務(wù)都是這四種操作派生出來的。
SubTask這個(gè)大渣男分別和四種session結(jié)合生成了CommRequest、ExecRequest、SleepRequest、IORequest,使得所有的request都可以被作為子任務(wù)調(diào)度,都有state和error。
四種request分別派生出了WFNetWorkTask、WFThreadTask、WFTimerTask、WFFileTask。其中WFNetWorkTask和WFThreadTask都是兩個(gè)參數(shù)的模板類。對(duì)通信任務(wù)來說,參數(shù)是請(qǐng)求消息和回復(fù)消息,對(duì)于計(jì)算任務(wù)來說參數(shù)是輸入和輸出,WFReduceTask、WFSortTask、WFMergeTask是不用參數(shù)的的實(shí)例化,WFHttpTask、WFRedisTask、WFMysqlTask、WFKafkaTask只不過是不同協(xié)議的WFNetWorkTask的實(shí)例化。
CommRequest派生了WFNetworkTask;ExecRequest派生了WFThreadTask,二者都加入了輸入輸出模板參數(shù),和一些控制參數(shù),提供了方便的啟動(dòng)多線程任務(wù)和網(wǎng)絡(luò)任務(wù)的方式。更有WFMultiThreadTask任務(wù),批量管理多線程任務(wù)。
這里還有一個(gè)WFTimerTask,實(shí)現(xiàn)了不占線程的定時(shí)功能.。
WFTimerTask:
WFTimerTask可以讓任務(wù)休眠一定時(shí)長后執(zhí)行,不占線程,達(dá)到時(shí)長之后返回執(zhí)行回調(diào),就是定時(shí)任務(wù)。
如果一個(gè)WFTimerTask被直接start(),則創(chuàng)建一個(gè)SeriesWork,并dispatch()起來,如果是串在其他的SeriesWork,當(dāng)執(zhí)行到這個(gè)task的時(shí)候直接dispatch()。
當(dāng)SleepRequest被dispatch()時(shí)候,實(shí)際是調(diào)用當(dāng)前scheduler(即communicator)的sleep(),實(shí)際是取出當(dāng)前WFTimerTask的休眠時(shí)間,然后創(chuàng)建一個(gè)定時(shí)任務(wù)mpoller_add_timer交給epoll管理,等epoll提示時(shí)間到了,再切回來執(zhí)行。
層次結(jié)構(gòu):
借用一張官圖非常清楚的表達(dá)清楚任務(wù)之間的層次關(guān)系。
用戶接口
至此,底層支持都分析過了,下面看看通過這些底層結(jié)構(gòu)可以組織出什么花樣。
其他Tasks
WFCounterTask:
CounterTask是一個(gè)計(jì)數(shù)器Task,任務(wù)里保存了一個(gè)原子的unsigned用來計(jì)數(shù),初始化時(shí)候傳入需要記的個(gè)數(shù),每次任務(wù)被dispatch()的時(shí)候,計(jì)數(shù)器減一,直到計(jì)數(shù)器為0時(shí),執(zhí)行回調(diào),配合一個(gè)阻塞信號(hào)量,可以實(shí)現(xiàn)一批并行任務(wù)的統(tǒng)一等待,如:WaitGroup。
可能是覺得手動(dòng)創(chuàng)建CounterTask不夠優(yōu)雅,框架還創(chuàng)建了CounterTask管理器,用一個(gè)紅黑樹以名字為key統(tǒng)一管理CounterTask,可以通過名字全局操作CounterTask。
WaitGroup
既然說到了就順便說一下WaitGroup。
WaitGroup實(shí)現(xiàn)了阻塞等待多個(gè)任務(wù)完成的效果。
WaitGroup由一個(gè)原子的等待個(gè)數(shù),一個(gè)WFCounterTask和一個(gè)std::future組成。構(gòu)造時(shí)創(chuàng)建一個(gè)std::promise,并綁定到future上;創(chuàng)建一個(gè)計(jì)數(shù)1的CounterTask并注冊(cè)回調(diào),回調(diào)中時(shí)給promise->setvalue()。
每次調(diào)用done會(huì)給剩余個(gè)數(shù)減一,當(dāng)減完時(shí),counter->done(),這時(shí)回調(diào)會(huì)告訴futrue,所有任務(wù)都完成了,阻塞結(jié)束。
WFGraphNode和WFGraphTask:
WFGraphTask實(shí)現(xiàn)了將任務(wù)迅速的組織成有向無環(huán)圖的方法,一個(gè)WFGraphTask管理了一張由多個(gè)WFGraphNode組成。
WFGraphNode是一個(gè)WFCounterTask,并加入了一個(gè)WFGraphNode*列表:follower,follower表達(dá)了鄰接關(guān)系,保存的就是依賴當(dāng)前任務(wù)的下游節(jié)點(diǎn)。因?yàn)槭莄ounter任務(wù),所以具有計(jì)數(shù)的功能,記的數(shù)就是當(dāng)前Node的入度。在當(dāng)前任務(wù)執(zhí)行完之后,會(huì)把所有下游節(jié)點(diǎn)都dispatch(計(jì)數(shù))一次,當(dāng)計(jì)數(shù)減少到0時(shí),說明當(dāng)前Node所有依賴已經(jīng)完成了,就把當(dāng)前graphNode上掛的SeriesWork執(zhí)行起來。
依賴處理:當(dāng)一個(gè)node1依賴Node2時(shí)候,Node2的下游節(jié)點(diǎn)列表里加入Node1,Node1的入度自增。
執(zhí)行處理:當(dāng)Node2執(zhí)行完,Node1的入度減一。
框架的重載了GraphNode的自增運(yùn)算符和大于號(hào)、小于號(hào),自增運(yùn)算符返回Node本身。大于號(hào)、小于號(hào)運(yùn)算符調(diào)用依賴關(guān)系函數(shù)。從而很形象的可以通過如下語法表達(dá)節(jié)點(diǎn)之間的依賴關(guān)系:
a-->c;
b-->d;
c-->d;
是不是很秀?簡(jiǎn)直妙不可言
再說一個(gè)細(xì)節(jié):DAG建立起來了,但是Node上是怎么掛的任務(wù)呢?
答:創(chuàng)建WFGraphNode通過統(tǒng)一接口:WFGraphNode& WFGraphTask::create_graph_node(SubTask *task),創(chuàng)建的時(shí)候傳入你想要執(zhí)行的任務(wù),然后把要執(zhí)行的任務(wù)和當(dāng)前Counter任務(wù)串在一個(gè)Series里面。當(dāng)當(dāng)前Node計(jì)數(shù)器第一次變0的時(shí)候,會(huì)調(diào)到Done(),看一下關(guān)鍵的done()實(shí)現(xiàn):
{
SeriesWork *series = series_of(this);
if (!this->user_data)//首次done會(huì)進(jìn)這里
{
this->value = 1;//value=1使該任務(wù)再執(zhí)行一次就可以達(dá)到結(jié)束狀態(tài)
this->user_data = (void *)1;//下次再進(jìn)來就不進(jìn)這個(gè)分支了,而是直接delete this;
}
else
delete this;
return series->pop();
}
首次done()的時(shí)候不析構(gòu),并將狀態(tài)置為下次進(jìn)來析構(gòu)(value賦1&&user_data非空)。
然后將本series里面要執(zhí)行的用戶任務(wù)執(zhí)行起來。當(dāng)用戶任務(wù)執(zhí)行完,會(huì)再次執(zhí)行到GraphNode->Done();這時(shí)侯,Node析構(gòu),并將所有follower->dispatch()起來。這就是圖任務(wù)的整體執(zhí)行路徑。
WFRepeaterTask:
這是一個(gè)遞歸Task,繼承自GenericTask,也就是說啟動(dòng)時(shí),會(huì)創(chuàng)建一個(gè)Series,并把Series啟動(dòng)起來。創(chuàng)建的時(shí)候傳入創(chuàng)建任務(wù)的回調(diào)Create,在dispatch()得時(shí)候,往當(dāng)前Series里傳入兩個(gè)任務(wù),一個(gè)是Create回調(diào)創(chuàng)建出來的新任務(wù),一個(gè)是當(dāng)前任務(wù)。這樣的話,順序任務(wù)的調(diào)度就變成:執(zhí)行任務(wù)—》創(chuàng)建任務(wù)—》執(zhí)行任務(wù)。。。
WFConditional:
WFConditional是條件任務(wù)包裝器,可以把其他任務(wù)包裝成條件任務(wù),通過一個(gè)atomic變量實(shí)現(xiàn)。新增加一個(gè)signal接口,當(dāng)dispatch和signal都執(zhí)行后,任務(wù)會(huì)被執(zhí)行。原理:當(dāng)任務(wù)被dispatch或者signal時(shí),都會(huì)去設(shè)置原子bool的值,并檢查狀態(tài),如果設(shè)置過狀態(tài),就調(diào)起任務(wù),可見第一次不會(huì)調(diào)起,第二次才會(huì)調(diào)起任務(wù)。
為了避免發(fā)送signal者持有條件任務(wù)的裸指針,框架還提供了全局的命名的條件任務(wù),發(fā)送者可以根據(jù)名字給conditional發(fā)signal,內(nèi)部是一個(gè)觀察者模式,以cond的名字為key構(gòu)建了一個(gè)紅黑樹管理,當(dāng)signal某個(gè)key的時(shí)候,找到對(duì)應(yīng)的條件任務(wù)發(fā)送signal()。
WFModuleTask:
WFModuleTask提供了一個(gè)模塊級(jí)的封裝,可以把一系列任務(wù)封裝到一個(gè)模塊里,可以注冊(cè)一個(gè)模塊的回調(diào)函數(shù)。WFModuleTask本質(zhì)上還是一個(gè)SeriesWork,把一系列任務(wù)封裝在一起,降低功能任務(wù)之間的耦合程度。
服務(wù)
基于workflow框架我們可以迅速的構(gòu)建http服務(wù)器,只需要幾行代碼:
{
WFHttpServer server([](WFHttpTask *task) {
task->get_resp()->append_output_body("Hello World!");
});
if (server.start(8888) == 0) { // start server on port 8888
getchar(); // press "Enter" to end.
server.stop();
}
return 0;
}
可以看到構(gòu)造一個(gè)WFHttpServer,只要傳入一個(gè)處理WFHttpTask的回調(diào)函數(shù)即可。
下面分別看 WFHttpServer 、WFServerTask
WFHttpServer
首先WFHttpServer是WFServer的http消息時(shí)的特化版本。WFServer在BaseServer的基礎(chǔ)上增加了輸入輸出模板參數(shù),并增加了一個(gè)可以處理WFNetworkTask的回調(diào)函數(shù),同時(shí)重寫了new_session方法;
poller在create_message的時(shí)候會(huì)調(diào)到new_session,創(chuàng)建WFServerTask;
Communicator并不知道Service是什么類型的service,在create_message的時(shí)候不管是什么類型的service,都調(diào)用service對(duì)應(yīng)的new_session接口去生產(chǎn)session交給Poller去生成任務(wù)交由線程池執(zhí)行。
WFServerTask
WFServerTask繼承自WFHttpTask,WFServerTask內(nèi)定義了兩個(gè)局部類,Processor和Series。
前者Processor保存著服務(wù)初始化時(shí)傳入的回調(diào)和當(dāng)前WFServerTask的指針,dispatch時(shí)執(zhí)行回調(diào)處理當(dāng)前任務(wù)。
后者Series本質(zhì)上是一個(gè)SeriesWork,把Processor和當(dāng)前任務(wù)串起來,并先執(zhí)行Processor,最后執(zhí)行當(dāng)前WFServerTask,當(dāng)前任務(wù)負(fù)責(zé)reply。同時(shí)負(fù)責(zé)引用計(jì)數(shù),讓service知道有多少任務(wù)在引用。
服務(wù)小結(jié)
session是被動(dòng)產(chǎn)生的,服務(wù)是靜態(tài)定義的,服務(wù)定義了自己的服務(wù)類型、和產(chǎn)生任務(wù)的方法、處理任務(wù)的回調(diào)等等,然后在服務(wù)啟動(dòng)的時(shí)候綁定地址創(chuàng)建fd,把自己綁定到Communicator上,交給Reactor去調(diào)度。
-
服務(wù)器
+關(guān)注
關(guān)注
12文章
9123瀏覽量
85324 -
編程
+關(guān)注
關(guān)注
88文章
3614瀏覽量
93686 -
代碼
+關(guān)注
關(guān)注
30文章
4779瀏覽量
68524
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論