什么是select?
有的朋友可能對select也不是很了解啊,我這里稍微科普一下:網(wǎng)絡(luò)連接,服務(wù)器也是通過文件描述符來管理這些連接上來的客戶端,既然是供連接的服務(wù)器,那就免不了要接收來自客戶端的消息。那么多臺客戶端,消息那么的多,要是漏了一條兩條重要消息,那也不要用TCP了,那怎么辦?
前輩們就是有辦法,輪詢,輪詢每個客戶端文件描述符,查看他們是否帶著消息,如果帶著,那就處理一下;如果沒帶著,那就一邊等著去。這就是select,輪詢,頗有點(diǎn)領(lǐng)導(dǎo)下基層的那種感覺哈。
但是這個select的輪詢吶,會有個問題,明眼人一下就能想到,那即是耗費(fèi)資源啊,耗費(fèi)什么資源,時間吶,慢吶(其實(shí)也挺快了,不過相對epoll來說就是慢)。 再認(rèn)真想一下,還浪費(fèi)什么資源,系統(tǒng)資源。有的客戶端吶,占著那啥玩意兒不干那啥事兒,這種客戶端吶,還不少。這也怪不得人家,哪兒有客戶端時時刻刻在發(fā)消息,要是有,那就要小心是不是惡意攻擊了。那把這么一堆偶爾動一下的客戶端的文件描述符一直攥手里,累不累?能一次攥多少個?就像一個老板,一直想著下去巡視,那他可以去當(dāng)車間組長了哈哈哈。
所以,select的默認(rèn)上限一般是1024(FD_SETSIZE),當(dāng)然我們可以手動去改,但是人家給個1024自然有人家的道理,改太大的話系統(tǒng)在這一塊的負(fù)載就大了。 那句話怎么說的來著,你每次對系統(tǒng)的索取,其實(shí)都早已明碼標(biāo)價!哈哈哈。。。
所以,我們選用epoll模型。
什么是epoll?
epoll接口是為解決Linux內(nèi)核處理大量文件描述符而提出的方案。該接口屬于Linux下多路I/O復(fù)用接口中select/poll的增強(qiáng)。其經(jīng)常應(yīng)用于Linux下高并發(fā)服務(wù)型程序,特別是在大量并發(fā)連接中只有少部分連接處于活躍下的情況 (通常是這種情況),在該情況下能顯著的提高程序的CPU利用率。
前面說,select就像親自下基層視察的老板,那么epoll這個老板就要顯得精明的多了。他可不親自下基層,他找了個美女秘書,他只要盯著他的秘書看就行了,呸,他只需要聽取他的秘書的匯報就行了。匯報啥呢?基層有任何消息,跟秘書說,秘書匯總之后一次性交給老板來處理。這樣老板的時間不就大大的提高了嘛。
如果你學(xué)過設(shè)計模式,這就是典型的“命令模式”,非常符合“依賴倒置原則”,這是一個非常美妙的模式,這個原則也是我最喜歡的一個原則,將高層實(shí)現(xiàn)與低層實(shí)現(xiàn)解耦合,從而可以各自開發(fā),只要接口一致便可,這個接口,就是秘書。
扯遠(yuǎn)了,如果對“設(shè)計模式”有興趣,可以找我的專欄。
好,言歸正傳哈哈哈。
epoll的設(shè)計思路
- (1)epoll在Linux內(nèi)核中構(gòu)建了一個文件系統(tǒng),該文件系統(tǒng)采用紅黑樹來構(gòu)建,紅黑樹在增加和刪除上面的效率極高,因此是epoll高效的原因之一。有興趣可以百度紅黑樹了解,但在這里你只需知道其算法效率超高即可。
- (2)epoll提供了兩種觸發(fā)模式,水平觸發(fā)(LT)和邊沿觸發(fā)(ET)。當(dāng)然,涉及到I/O操作也必然會有阻塞和非阻塞兩種方案。目前效率相對較高的是 epoll+ET+非阻塞I/O 模型,在具體情況下應(yīng)該合理選用當(dāng)前情形中最優(yōu)的搭配方案。
- (3)epoll所支持的FD上限是最大可以打開文件的數(shù)目,這個數(shù)字一般遠(yuǎn)大于1024,舉個例子,在1GB內(nèi)存的機(jī)器上大約是10萬左右,具體數(shù)目可以下面語句查看,一般來說這個數(shù)目和系統(tǒng)內(nèi)存關(guān)系很大。
阻塞I/O與非阻塞I/O
為了方便理解后面的內(nèi)容,我們先看幾張圖,關(guān)于阻塞與非阻塞I/O的。
阻塞式文件I/O
非阻塞式文件I/O
多路復(fù)用I/O
好,有了上面這幾張圖墊著,咱來看看邊緣觸發(fā)和水平觸發(fā)。
邊緣觸發(fā) VS 水平觸發(fā)
EPOLL 事件有兩種模型: Edge Triggered (ET) 邊緣觸發(fā) 只有新數(shù)據(jù)到來,才觸發(fā),不管緩存區(qū)中是否還有數(shù)據(jù)。 Level Triggered (LT) 水平觸發(fā) 只要有數(shù)據(jù)都會觸發(fā),不管數(shù)據(jù)是哪里的。 (這樣表述會不會好理解一些)
LT(level triggered) 是 缺省 的工作方式 ,并且同時支持 block 和 no-block socket. 在這種做法中,內(nèi)核告訴你一個文件描述符是否就緒了,然后你可以對這個就緒的 fd 進(jìn)行 IO 操作。如果你不作任何操作,內(nèi)核還是會繼續(xù)通知你的,所以,這種模式編程出錯誤可能性要小一點(diǎn)。傳統(tǒng)的 select/poll 都是這種模型的代表.
ET(edge-triggered) 是高速工作方式 ,只支持 no-block socket 。在這種模式下,當(dāng)描述符從未就緒變?yōu)榫途w時,內(nèi)核通過 epoll 告訴你。然后它會假設(shè)你知道文件描述符已經(jīng)就緒,并且不會再為那個文件描述符發(fā)送更多的就緒通知,直到你做了某些操作導(dǎo)致那個文件描述符不再為就緒狀態(tài)了 ( 比如,你在發(fā)送,接收或者接收請求,或者發(fā)送接收的數(shù)據(jù)少于一定量時導(dǎo)致了一個 EWOULDBLOCK 錯誤)。但是請注意,如果一直不對這個 fd 作 IO 操作 ( 從而導(dǎo)致它再次變成未就緒 ) ,內(nèi)核不會發(fā)送更多的通知 (only once), 不過在 TCP 協(xié)議中, ET 模式的加速效用仍需要更多的 benchmark 確認(rèn)。
要設(shè)置ET:在epoll_ctl函數(shù)中配置上EPOLLET即可
epoll 工作在 ET 模式的時候,必須使用非阻塞套接口,以避免由于一個文件句柄的阻塞讀 / 阻塞寫操作把處理多個文件描述符的任務(wù)餓死。最好以下面的方式調(diào)用 ET 模式的 epoll 接口,在后面會介紹避免可能的缺陷。
- 基于非阻塞文件句柄
- 只有當(dāng) read(2) 或者 write(2) 返回 EAGAIN 時才需要掛起,等待。但這并不是說每次 read() 時都需要循環(huán)讀,直到讀到產(chǎn)生一個 EAGAIN 才認(rèn)為此次事件處理完成,當(dāng) read() 返回的讀到的數(shù)據(jù)長度小于請求的數(shù)據(jù)長度時,就可以確定此時緩沖中已沒有數(shù)據(jù)了,也就可以認(rèn)為此事讀事件已處理完成。
epoll API
epoll提供的API,我所用過的其實(shí)不多,無非就那么幾個。 所以我就只能聊聊我說用過的。
頭文件
#include< sys/epoll.h >
創(chuàng)建句柄
int epoll_create(int size);
創(chuàng)建一個epoll句柄,參數(shù)size用于告訴內(nèi)核監(jiān)聽的文件描述符個數(shù),跟內(nèi)存大小有關(guān)。 返回epoll 文件描述符
控制某個epoll監(jiān)控的文件描述符上的事件:注冊,修改,刪除
參數(shù)釋義: epfd:為epoll的句柄 op:表示動作,用3個宏來表示 ··· EPOLL_CTL_ADD(注冊新的 fd 到epfd) ··· EPOLL_CTL_DEL(從 epfd 中刪除一個 fd) ··· EPOLL_CTL_MOD(修改已經(jīng)注冊的 fd 監(jiān)聽事件)
event:告訴內(nèi)核需要監(jiān)聽的事件
typedef union epoll_data
{
void* ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t; /* 保存觸發(fā)事件的某個文件描述符相關(guān)的數(shù)據(jù) */
struct epoll_event
{
__uint32_t events; /* epoll event */
epoll_data_t data; /* User data variable */
};
/* epoll_event.events:
EPOLLIN 表示對應(yīng)的文件描述符可以讀
EPOLLOUT 表示對應(yīng)的文件描述符可以寫
EPOLLPRI 表示對應(yīng)的文件描述符有緊急的數(shù)據(jù)可讀
EPOLLERR 表示對應(yīng)的文件描述符發(fā)生錯誤
EPOLLHUP 表示對應(yīng)的文件描述符被掛斷
EPOLLET 設(shè)置ET模式
*/
epoll消息讀取
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
等待所監(jiān)控文件描述符上有事件的產(chǎn)生
參數(shù)釋義: events:用來從內(nèi)核得到事件的集合 maxevent:用于告訴內(nèi)核這個event有多大,這個maxevent不能大于創(chuàng)建句柄時的size timeout:超時時間 ··· -1:阻塞 ··· 0:立即返回 ···>0:指定微秒
成功返回有多少個文件描述符準(zhǔn)備就緒,時間到返回0,出錯返回-1.
代碼示例
/* 實(shí)現(xiàn)功能:通過epoll, 處理多個socket
* 監(jiān)聽一個端口,監(jiān)聽到有鏈接時,添加到epoll_event
* xs
*/
#include < stdio.h >
#include < stdlib.h >
#include < string.h >
#include < sys/socket.h >
#include < poll.h >
#include < sys/epoll.h >
#include < sys/time.h >
#include < netinet/in.h >
#include < unistd.h >
#define MYPORT 12345
//最多處理的connect
#define MAX_EVENTS 500
//當(dāng)前的連接數(shù)
int currentClient = 0;
//數(shù)據(jù)接受 buf
#define REVLEN 10
char recvBuf[REVLEN];
//epoll描述符
int epollfd;
//事件數(shù)組
struct epoll_event eventList[MAX_EVENTS];
void AcceptConn(int srvfd);
void RecvData(int fd);
int main()
{
int i, ret, sinSize;
int recvLen = 0;
fd_set readfds, writefds;
int sockListen, sockSvr, sockMax;
int timeout;
struct sockaddr_in server_addr;
struct sockaddr_in client_addr;
//socket
if((sockListen=socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
printf("socket errorn");
return -1;
}
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(MYPORT);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
//bind
if(bind(sockListen, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0)
{
printf("bind errorn");
return -1;
}
//listen
if(listen(sockListen, 5) < 0)
{
printf("listen errorn");
return -1;
}
// epoll 初始化
epollfd = epoll_create(MAX_EVENTS);
struct epoll_event event;
event.events = EPOLLIN|EPOLLET;
event.data.fd = sockListen;
//add Event
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, sockListen, &event) < 0)
{
printf("epoll add fail : fd = %dn", sockListen);
return -1;
}
//epoll
while(1)
{
timeout=3000;
//epoll_wait
int ret = epoll_wait(epollfd, eventList, MAX_EVENTS, timeout);
if(ret < 0)
{
printf("epoll errorn");
break;
}
else if(ret == 0)
{
printf("timeout ...n");
continue;
}
//直接獲取了事件數(shù)量,給出了活動的流,這里是和poll區(qū)別的關(guān)鍵
int i = 0;
for(i=0; i< ret; i++)
{
//錯誤退出
if ((eventList[i].events & EPOLLERR) ||
(eventList[i].events & EPOLLHUP) ||
!(eventList[i].events & EPOLLIN))
{
printf ( "epoll errorn");
close (eventList[i].data.fd);
return -1;
}
if (eventList[i].data.fd == sockListen)
{
AcceptConn(sockListen);
}else{
RecvData(eventList[i].data.fd);
}
}
}
close(epollfd);
close(sockListen);
return 0;
}
/**************************************************
函數(shù)名:AcceptConn
功能:接受客戶端的鏈接
參數(shù):srvfd:監(jiān)聽SOCKET
***************************************************/
void AcceptConn(int srvfd)
{
struct sockaddr_in sin;
socklen_t len = sizeof(struct sockaddr_in);
bzero(&sin, len);
int confd = accept(srvfd, (struct sockaddr*)&sin, &len);
if (confd < 0)
{
printf("bad acceptn");
return;
}else
{
printf("Accept Connection: %d", confd);
}
//將新建立的連接添加到EPOLL的監(jiān)聽中
struct epoll_event event;
event.data.fd = confd;
event.events = EPOLLIN|EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, confd, &event);
}
//讀取數(shù)據(jù)
void RecvData(int fd)
{
int ret;
int recvLen = 0;
memset(recvBuf, 0, REVLEN);
printf("RecvData functionn");
if(recvLen != REVLEN)
{
while(1)
{
//recv數(shù)據(jù)
ret = recv(fd, (char *)recvBuf+recvLen, REVLEN-recvLen, 0);
if(ret == 0)
{
recvLen = 0;
break;
}
else if(ret < 0)
{
recvLen = 0;
break;
}
//數(shù)據(jù)接受正常
recvLen = recvLen+ret;
if(recvLen< REVLEN)
{
continue;
}
else
{
//數(shù)據(jù)接受完畢
printf("buf = %sn", recvBuf);
recvLen = 0;
break;
}
}
}
printf("data is %s", recvBuf);
}
整體拔高:高效的并發(fā)方式
并發(fā)編程的目的是讓程序”同時”執(zhí)行多個任務(wù)。如果程序是計算密集型的,并發(fā)編程并沒有什么優(yōu)勢,反而由于任務(wù)的切換使效率降低。但如果程序是I/O密集型的,那就不同了。
并發(fā)模式是指I/O處理單元和多個邏輯單元之間協(xié)調(diào)完成任務(wù)的方法,服務(wù)器主要有兩種并發(fā)編程模式:半同步/半異步(half-sync/half-async)模式和領(lǐng)導(dǎo)者/追隨者(Leader/Followers)模式。
這里講一個“半同步/半異步”。
下面的內(nèi)容需要有一定的基礎(chǔ)了,小白可以收藏一下以后變強(qiáng)了再看。
半同步/半異步模式
在半同步/半異步模式中,同步線程用于處理客戶邏輯,異步線程用于處理I/O事件。異步線程監(jiān)聽到客戶請求之后就將其封裝成請求對象并插入到請求隊(duì)列中。請求隊(duì)列將通知某個工作在同步模式的工作線程來讀取并處理該請求對象。
半同步/半反應(yīng)堆模式(half-sync/half-reactive模式)
半同步/半反應(yīng)堆模式是半同步/半異步模式的一種變體。 其結(jié)構(gòu)如下圖:
在上圖中,異步線程只有一個,由主線程充當(dāng),負(fù)責(zé)監(jiān)聽socket上的事件。如果監(jiān)聽socket上有新的連接請求到來,主線程就接受新的連接socket,然后往epoll內(nèi)核事件表中注冊該socket上的讀寫事件。如果連接socket上有讀寫事件發(fā)生,即有新的客戶請求到來或有數(shù)據(jù)要發(fā)送至客戶端,主線程就將該連接socket插入到請求隊(duì)列中,所有工作線程都睡眠在請求隊(duì)列上,當(dāng)有任務(wù)到來時,他們通過競爭來獲取任務(wù)的接管權(quán)。 由于主線程插入請求隊(duì)列中的任務(wù)是就緒的連接socket,所以該半同步/半反應(yīng)堆模式所采用的事件處理模式是Reactor模式,即工作線程要自己從socket上讀寫數(shù)據(jù)。當(dāng)然,半同步/半反應(yīng)堆模式也可以用模擬的Proactor事件處理模式,即由主線程來完成數(shù)據(jù)的讀寫操作,此時主線程將應(yīng)用程序數(shù)據(jù)、任務(wù)類型等信息封裝為一個任務(wù)對象,然后將其插入到請求隊(duì)列。
半同步/半反應(yīng)堆模式的缺點(diǎn): 主線程和工作線程共享請求隊(duì)列,因而請求隊(duì)列是臨界資源,所以對請求隊(duì)列操作的時候需要加鎖保護(hù)。 每個工作線程在同一時間只能處理一個客戶請求。如果客戶數(shù)量增多,則請求隊(duì)列中堆積任務(wù)太多,客戶端的響應(yīng)會越來越慢。如果增多工作線程的話,則線程的切花也將消耗大量的CPU時間。
高效的半同步/半異步模式
在半同步/半反應(yīng)堆模式中,每個工作線程同時只能處理一個客戶請求,如果并發(fā)量大的話,客戶端響應(yīng)會很慢。如果每個工作線程都能同時處理多個客戶鏈接,則就能改善這種情況,所以就有了高效的半同步/半異步模式。 其結(jié)構(gòu)如圖:
主線程只管監(jiān)聽socket,當(dāng)有新的連接socket到來時,主線程就接受連接并返回新的連接socket給某個工作線程。此后該新連接socket上的任何I/O操作都由被選中的工作線程來處理,直到客戶端關(guān)閉連接。當(dāng)工作線程檢測到有新的連接socket到來時,就把該新的連接socket的讀寫事件注冊到自己的epoll內(nèi)核事件表中。 主線程和工作線程都維持自己的事件循環(huán),他們各自獨(dú)立的監(jiān)聽不同事件。因此在這種高效的半同步/半異步模式中,每個線程都工作在異步模式中,所以它并非嚴(yán)格意義上的半同步/半異步模式。
epoll源碼學(xué)習(xí)
數(shù)據(jù)結(jié)構(gòu)
eventpoll
// epoll的核心實(shí)現(xiàn)對應(yīng)于一個epoll描述符
struct eventpoll {
spinlock_t lock;
struct mutex mtx;
wait_queue_head_t wq; // sys_epoll_wait() 等待在這里
// f_op- >poll() 使用的, 被其他事件通知機(jī)制利用的wait_address
wait_queue_head_t poll_wait;
//已就緒的需要檢查的epitem 列表
struct list_head rdllist;
//保存所有加入到當(dāng)前epoll的文件對應(yīng)的epitem
struct rb_root rbr;
// 當(dāng)正在向用戶空間復(fù)制數(shù)據(jù)時, 產(chǎn)生的可用文件
struct epitem *ovflist;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
struct file *file;
//優(yōu)化循環(huán)檢查,避免循環(huán)檢查中重復(fù)的遍歷
int visited;
struct list_head visited_list_link;
}
epitem
// 對應(yīng)于一個加入到epoll的文件
struct epitem {
// 掛載到eventpoll 的紅黑樹節(jié)點(diǎn)
struct rb_node rbn;
// 掛載到eventpoll.rdllist 的節(jié)點(diǎn)
struct list_head rdllink;
// 連接到ovflist 的指針
struct epitem *next;
/* 文件描述符信息fd + file, 紅黑樹的key */
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait;
// 當(dāng)前文件的等待隊(duì)列(eppoll_entry)列表
// 同一個文件上可能會監(jiān)視多種事件,
// 這些事件可能屬于不同的wait_queue中
// (取決于對應(yīng)文件類型的實(shí)現(xiàn)),
// 所以需要使用鏈表
struct list_head pwqlist;
// 當(dāng)前epitem 的所有者
struct eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* epoll_ctl 傳入的用戶數(shù)據(jù) */
struct epoll_event event;
};
eppoll_entry
// 與一個文件上的一個wait_queue_head 相關(guān)聯(lián),因?yàn)橥晃募赡苡卸鄠€等待的事件,
//這些事件可能使用不同的等待隊(duì)列
struct eppoll_entry {
// List struct epitem.pwqlist
struct list_head llink;
// 所有者
struct epitem *base;
// 添加到wait_queue 中的節(jié)點(diǎn)
wait_queue_t wait;
// 文件wait_queue 頭
wait_queue_head_t *whead;
};
函數(shù)接口
epoll_create()
//先進(jìn)行判斷size是否 >=0,若是則直接調(diào)用epoll_create1
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
}
SYSCALL_DEFINE1是一個宏,用于定義有一個參數(shù)的系統(tǒng)調(diào)用函數(shù),上述宏展開后即成為: int sys_epoll_create(int size),這就是epoll_create系統(tǒng)調(diào)用的入口。至于為何要用宏而不是直接聲明,主要是因?yàn)橄到y(tǒng)調(diào)用的參數(shù)個數(shù)、傳參方式都有嚴(yán)格限制,最多六個參數(shù)。
/* 這才是真正的epoll_create啊~~ */
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error;
struct eventpoll *ep = NULL;//主描述符
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
/* 對于epoll來講, 目前唯一有效的flag就是CLOEXEC */
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
/* 分配一個struct eventpoll */
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
/* 這里是創(chuàng)建一個匿名fd。
epollfd本身并不存在一個真正的文件與之對應(yīng), 所以內(nèi)核需要創(chuàng)建一個
"虛擬"的文件, 并為之分配真正的struct file結(jié)構(gòu), 而且有真正的fd.
這里2個參數(shù)比較關(guān)鍵:
eventpoll_fops, fops就是file operations, 就是當(dāng)你對這個文件(這里是虛擬的)進(jìn)行操作(比如讀)時,
fops里面的函數(shù)指針指向真正的操作實(shí)現(xiàn), 類似C++里面虛函數(shù)和子類的概念.
epoll只實(shí)現(xiàn)了poll和release(就是close)操作, 其它文件系統(tǒng)操作都有VFS全權(quán)處理了.
ep, ep就是struct epollevent, 它會作為一個私有數(shù)據(jù)保存在struct file的private指針里面.
*/
error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (error < 0)
ep_free(ep);
return error;
// epoll 文件系統(tǒng)的相關(guān)實(shí)現(xiàn)
// epoll 文件系統(tǒng)初始化, 在系統(tǒng)啟動時會調(diào)用
static int __init eventpoll_init(void)
{
struct sysinfo si;
si_meminfo(&si);
// 限制可添加到epoll的最多的描述符數(shù)量
max_user_watches = (((si.totalram - si.totalhigh) / 25) < < PAGE_SHIFT) /
EP_ITEM_COST;
BUG_ON(max_user_watches < 0);
// 初始化遞歸檢查隊(duì)列
ep_nested_calls_init(&poll_loop_ncalls);
ep_nested_calls_init(&poll_safewake_ncalls);
ep_nested_calls_init(&poll_readywalk_ncalls);
// epoll 使用的slab分配器分別用來分配epitem和eppoll_entry
epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
0, SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);
pwq_cache = kmem_cache_create("eventpoll_pwq",
sizeof(struct eppoll_entry), 0, SLAB_PANIC, NULL);
return 0;
}
epoll_ctl()
//創(chuàng)建好epollfd后, 接下來添加fd
//epoll_ctl的參數(shù):epfd 表示epollfd;op 有ADD,MOD,DEL,
//fd 是需要監(jiān)聽的描述符,event 我們感興趣的events
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int did_lock_epmutex = 0;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
error = -EFAULT;
//錯誤處理以及從用戶空間將epoll_event結(jié)構(gòu)copy到內(nèi)核空間.
if (ep_op_has_event(op) &&
// 復(fù)制用戶空間數(shù)據(jù)到內(nèi)核
copy_from_user(&epds, event, sizeof(struct epoll_event))) {
goto error_return;
}
// 取得 epfd 對應(yīng)的文件
error = -EBADF;
file = fget(epfd);
if (!file) {
goto error_return;
}
// 取得目標(biāo)文件
tfile = fget(fd);
if (!tfile) {
goto error_fput;
}
// 目標(biāo)文件必須提供 poll 操作
error = -EPERM;
if (!tfile- >f_op || !tfile- >f_op- >poll) {
goto error_tgt_fput;
}
// 添加自身或epfd 不是epoll 句柄
error = -EINVAL;
if (file == tfile || !is_file_epoll(file)) {
goto error_tgt_fput;
}
// 取得內(nèi)部結(jié)構(gòu)eventpoll
ep = file- >private_data;
// EPOLL_CTL_MOD 不需要加全局鎖 epmutex
if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_DEL) {
mutex_lock(&epmutex);
did_lock_epmutex = 1;
}
if (op == EPOLL_CTL_ADD) {
if (is_file_epoll(tfile)) {
error = -ELOOP;
// 目標(biāo)文件也是epoll 檢測是否有循環(huán)包含的問題
if (ep_loop_check(ep, tfile) != 0) {
goto error_tgt_fput;
}
} else
{
// 將目標(biāo)文件添加到 epoll 全局的tfile_check_list 中
list_add(&tfile- >f_tfile_llink, &tfile_check_list);
}
}
mutex_lock_nested(&ep- >mtx, 0);
// 以tfile 和fd 為key 在rbtree 中查找文件對應(yīng)的epitem
epi = ep_find(ep, tfile, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
// 沒找到, 添加額外添加ERR HUP 事件
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tfile, fd);
} else {
error = -EEXIST;
}
// 清空文件檢查列表
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi) {
error = ep_remove(ep, epi);
} else {
error = -ENOENT;
}
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else {
error = -ENOENT;
}
break;
}
mutex_unlock(&ep- >mtx);
error_tgt_fput:
if (did_lock_epmutex) {
mutex_unlock(&epmutex);
}
fput(tfile);
error_fput:
fput(file);
error_return:
return error;
}
//ep_insert()在epoll_ctl()中被調(diào)用, 完成往epollfd里面添加一個監(jiān)聽fd的工作
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
/*
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};
*/
// 增加監(jiān)視文件數(shù)
user_watches = atomic_long_read(&ep- >user- >epoll_watches);
if (unlikely(user_watches >= max_user_watches)) {
return -ENOSPC;
}
// 分配初始化 epi
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) {
return -ENOMEM;
}
INIT_LIST_HEAD(&epi- >rdllink);
INIT_LIST_HEAD(&epi- >fllink);
INIT_LIST_HEAD(&epi- >pwqlist);
epi- >ep = ep;
// 初始化紅黑樹中的key
ep_set_ffd(&epi- >ffd, tfile, fd);
// 直接復(fù)制用戶結(jié)構(gòu)
epi- >event = *event;
epi- >nwait = 0;
epi- >next = EP_UNACTIVE_PTR;
// 初始化臨時的 epq
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 設(shè)置事件掩碼
epq.pt._key = event- >events;
// 內(nèi)部會調(diào)用ep_ptable_queue_proc, 在文件對應(yīng)的wait queue head 上
// 注冊回調(diào)函數(shù), 并返回當(dāng)前文件的狀態(tài)
revents = tfile- >f_op- >poll(tfile, &epq.pt);
// 檢查錯誤
error = -ENOMEM;
if (epi- >nwait < 0) { // f_op- >poll 過程出錯
goto error_unregister;
}
// 添加當(dāng)前的epitem 到文件的f_ep_links 鏈表
spin_lock(&tfile- >f_lock);
list_add_tail(&epi- >fllink, &tfile- >f_ep_links);
spin_unlock(&tfile- >f_lock);
// 插入epi 到rbtree
ep_rbtree_insert(ep, epi);
/* now check if we've created too many backpaths */
error = -EINVAL;
if (reverse_path_check()) {
goto error_remove_epi;
}
spin_lock_irqsave(&ep- >lock, flags);
/* 文件已經(jīng)就緒插入到就緒鏈表rdllist */
if ((revents & event- >events) && !ep_is_linked(&epi- >rdllink)) {
list_add_tail(&epi- >rdllink, &ep- >rdllist);
if (waitqueue_active(&ep- >wq))
// 通知sys_epoll_wait , 調(diào)用回調(diào)函數(shù)喚醒sys_epoll_wait 進(jìn)程
{
wake_up_locked(&ep- >wq);
}
// 先不通知調(diào)用eventpoll_poll 的進(jìn)程
if (waitqueue_active(&ep- >poll_wait)) {
pwake++;
}
}
spin_unlock_irqrestore(&ep- >lock, flags);
atomic_long_inc(&ep- >user- >epoll_watches);
if (pwake)
// 安全通知調(diào)用eventpoll_poll 的進(jìn)程
{
ep_poll_safewake(&ep- >poll_wait);
}
return 0;
error_remove_epi:
spin_lock(&tfile- >f_lock);
// 刪除文件上的 epi
if (ep_is_linked(&epi- >fllink)) {
list_del_init(&epi- >fllink);
}
spin_unlock(&tfile- >f_lock);
// 從紅黑樹中刪除
rb_erase(&epi- >rbn, &ep- >rbr);
error_unregister:
// 從文件的wait_queue 中刪除, 釋放epitem 關(guān)聯(lián)的所有eppoll_entry
ep_unregister_pollwait(ep, epi);
spin_lock_irqsave(&ep- >lock, flags);
if (ep_is_linked(&epi- >rdllink)) {
list_del_init(&epi- >rdllink);
}
spin_unlock_irqrestore(&ep- >lock, flags);
// 釋放epi
kmem_cache_free(epi_cache, epi);
return error;
}
static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait)
{
int pollflags;
struct eventpoll *ep = file- >private_data;
// 插入到wait_queue
poll_wait(file, &ep- >poll_wait, wait);
// 掃描就緒的文件列表, 調(diào)用每個文件上的poll 檢測是否真的就緒,
// 然后復(fù)制到用戶空間
// 文件列表中有可能有epoll文件, 調(diào)用poll的時候有可能會產(chǎn)生遞歸,
// 調(diào)用所以用ep_call_nested 包裝一下, 防止死循環(huán)和過深的調(diào)用
pollflags = ep_call_nested(&poll_readywalk_ncalls, EP_MAX_NESTS,
ep_poll_readyevents_proc, ep, ep, current);
// static struct nested_calls poll_readywalk_ncalls;
return pollflags != -1 ? pollflags : 0;
}
// 通用的poll_wait 函數(shù), 文件的f_ops- >poll 通常會調(diào)用此函數(shù)
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p- >_qproc && wait_address) {
// 調(diào)用_qproc 在wait_address 上添加節(jié)點(diǎn)和回調(diào)函數(shù)
// 調(diào)用 poll_table_struct 上的函數(shù)指針向wait_address添加節(jié)點(diǎn), 并設(shè)置節(jié)點(diǎn)的func
// (如果是select或poll 則是 __pollwait, 如果是 epoll 則是 ep_ptable_queue_proc),
p- >_qproc(filp, wait_address, p);
}
}
/*
* 該函數(shù)在調(diào)用f_op- >poll()時會被調(diào)用.
* 也就是epoll主動poll某個fd時, 用來將epitem與指定的fd關(guān)聯(lián)起來的.
* 關(guān)聯(lián)的辦法就是使用等待隊(duì)列(waitqueue)
*/
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi- >nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
/* 初始化等待隊(duì)列, 指定ep_poll_callback為喚醒時的回調(diào)函數(shù),
* 當(dāng)我們監(jiān)聽的fd發(fā)生狀態(tài)改變時, 也就是隊(duì)列頭被喚醒時,
* 指定的回調(diào)函數(shù)將會被調(diào)用. */
init_waitqueue_func_entry(&pwq- >wait, ep_poll_callback);
pwq- >whead = whead;
pwq- >base = epi;
/* 將剛分配的等待隊(duì)列成員加入到頭中, 頭是由fd持有的 */
add_wait_queue(whead, &pwq- >wait);
list_add_tail(&pwq- >llink, &epi- >pwqlist);
/* nwait記錄了當(dāng)前epitem加入到了多少個等待隊(duì)列中,
* 我認(rèn)為這個值最大也只會是1... */
epi- >nwait++;
} else {
/* We have to signal that an error occurred */
epi- >nwait = -1;
}
}
//回調(diào)函數(shù), 當(dāng)我們監(jiān)聽的fd發(fā)生狀態(tài)改變時, 它會被調(diào)用.
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
//從等待隊(duì)列獲取epitem.需要知道哪個進(jìn)程掛載到這個設(shè)備
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi- >ep;//獲取
spin_lock_irqsave(&ep- >lock, flags);
if (!(epi- >event.events & ~EP_PRIVATE_BITS))
goto out_unlock;
/* 沒有我們關(guān)心的event... */
if (key && !((unsigned long) key & epi- >event.events))
goto out_unlock;
/*
* 這里看起來可能有點(diǎn)費(fèi)解, 其實(shí)干的事情比較簡單:
* 如果該callback被調(diào)用的同時, epoll_wait()已經(jīng)返回了,
* 也就是說, 此刻應(yīng)用程序有可能已經(jīng)在循環(huán)獲取events,
* 這種情況下, 內(nèi)核將此刻發(fā)生event的epitem用一個單獨(dú)的鏈表
* 鏈起來, 不發(fā)給應(yīng)用程序, 也不丟棄, 而是在下一次epoll_wait
* 時返回給用戶.
*/
if (unlikely(ep- >ovflist != EP_UNACTIVE_PTR)) {
if (epi- >next == EP_UNACTIVE_PTR) {
epi- >next = ep- >ovflist;
ep- >ovflist = epi;
}
goto out_unlock;
}
/* 將當(dāng)前的epitem放入ready list */
if (!ep_is_linked(&epi- >rdllink))
list_add_tail(&epi- >rdllink, &ep- >rdllist);
/* 喚醒epoll_wait... */
if (waitqueue_active(&ep- >wq))
wake_up_locked(&ep- >wq);
/* 如果epollfd也在被poll, 那就喚醒隊(duì)列里面的所有成員. */
if (waitqueue_active(&ep- >poll_wait))
pwake++;
out_unlock:
spin_unlock_irqrestore(&ep- >lock, flags);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep- >poll_wait);
return 1;
}
epoll_wait()
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct file *file;
struct eventpoll *ep;
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
/* 這個地方有必要說明一下:
* 內(nèi)核對應(yīng)用程序采取的策略是"絕對不信任",
* 所以內(nèi)核跟應(yīng)用程序之間的數(shù)據(jù)交互大都是copy, 不允許(也時候也是不能...)指針引用.
* epoll_wait()需要內(nèi)核返回數(shù)據(jù)給用戶空間, 內(nèi)存由用戶程序提供,
* 所以內(nèi)核會用一些手段來驗(yàn)證這一段內(nèi)存空間是不是有效的.
*/
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
error = -EFAULT;
goto error_return;
}
/* Get the "struct file *" for the eventpoll file */
error = -EBADF;
/* 獲取epollfd的struct file, epollfd也是文件嘛 */
file = fget(epfd);
if (!file)
goto error_return;
error = -EINVAL;
/* 檢查一下它是不是一個真正的epollfd... */
if (!is_file_epoll(file))
goto error_fput;
/* 獲取eventpoll結(jié)構(gòu) */
ep = file- >private_data;
/* 等待事件到來~~ */
error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fput(file);
error_return:
return error;
}
/* 這個函數(shù)真正將執(zhí)行epoll_wait的進(jìn)程帶入睡眠狀態(tài)... */
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;//等待隊(duì)列
/* 計算睡覺時間, 毫秒要轉(zhuǎn)換為HZ */
jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
retry:
spin_lock_irqsave(&ep- >lock, flags);
res = 0;
/* 如果ready list不為空, 就不睡了, 直接干活... */
if (list_empty(&ep- >rdllist)) {
/* OK, 初始化一個等待隊(duì)列, 準(zhǔn)備直接把自己掛起,
* 注意current是一個宏, 代表當(dāng)前進(jìn)程 */
init_waitqueue_entry(&wait, current);//初始化等待隊(duì)列,wait表示當(dāng)前進(jìn)程
__add_wait_queue_exclusive(&ep- >wq, &wait);//掛載到ep結(jié)構(gòu)的等待隊(duì)列
for (;;) {
/* 將當(dāng)前進(jìn)程設(shè)置位睡眠, 但是可以被信號喚醒的狀態(tài),
* 注意這個設(shè)置是"將來時", 我們此刻還沒睡! */
set_current_state(TASK_INTERRUPTIBLE);
/* 如果這個時候, ready list里面有成員了,
* 或者睡眠時間已經(jīng)過了, 就直接不睡了... */
if (!list_empty(&ep- >rdllist) || !jtimeout)
break;
/* 如果有信號產(chǎn)生, 也起床... */
if (signal_pending(current)) {
res = -EINTR;
break;
}
/* 啥事都沒有,解鎖, 睡覺... */
spin_unlock_irqrestore(&ep- >lock, flags);
/* jtimeout這個時間后, 會被喚醒,
* ep_poll_callback()如果此時被調(diào)用,
* 那么我們就會直接被喚醒, 不用等時間了...
* 再次強(qiáng)調(diào)一下ep_poll_callback()的調(diào)用時機(jī)是由被監(jiān)聽的fd
* 的具體實(shí)現(xiàn), 比如socket或者某個設(shè)備驅(qū)動來決定的,
* 因?yàn)榈却?duì)列頭是他們持有的, epoll和當(dāng)前進(jìn)程
* 只是單純的等待...
**/
jtimeout = schedule_timeout(jtimeout);//睡覺
spin_lock_irqsave(&ep- >lock, flags);
}
__remove_wait_queue(&ep- >wq, &wait);
/* OK 我們醒來了... */
set_current_state(TASK_RUNNING);
}
/* Is it worth to try to dig for events ? */
eavail = !list_empty(&ep- >rdllist) || ep- >ovflist != EP_UNACTIVE_PTR;
spin_unlock_irqrestore(&ep- >lock, flags);
/* 如果一切正常, 有event發(fā)生, 就開始準(zhǔn)備數(shù)據(jù)copy給用戶空間了... */
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
}
//調(diào)用p_scan_ready_list()
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}
//由ep_send_events()調(diào)用本函數(shù)
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
mutex_lock(&ep- >mtx);
spin_lock_irqsave(&ep- >lock, flags);
/* 這一步要注意, 首先, 所有監(jiān)聽到events的epitem都鏈到rdllist上了,
* 但是這一步之后, 所有的epitem都轉(zhuǎn)移到了txlist上, 而rdllist被清空了,
* 要注意哦, rdllist已經(jīng)被清空了! */
list_splice_init(&ep- >rdllist, &txlist);
/* ovflist, 在ep_poll_callback()里面我解釋過, 此時此刻我們不希望
* 有新的event加入到ready list中了, 保存后下次再處理... */
ep- >ovflist = NULL;
spin_unlock_irqrestore(&ep- >lock, flags);
/* 在這個回調(diào)函數(shù)里面處理每個epitem
* sproc 就是 ep_send_events_proc, 下面會注釋到. */
error = (*sproc)(ep, &txlist, priv);
spin_lock_irqsave(&ep- >lock, flags);
/* 現(xiàn)在我們來處理ovflist, 這些epitem都是我們在傳遞數(shù)據(jù)給用戶空間時
* 監(jiān)聽到了事件. */
for (nepi = ep- >ovflist; (epi = nepi) != NULL;
nepi = epi- >next, epi- >next = EP_UNACTIVE_PTR) {
/* 將這些直接放入readylist */
if (!ep_is_linked(&epi- >rdllink))
list_add_tail(&epi- >rdllink, &ep- >rdllist);
}
ep- >ovflist = EP_UNACTIVE_PTR;
/* 上一次沒有處理完的epitem, 重新插入到ready list */
list_splice(&txlist, &ep- >rdllist);
/* ready list不為空, 直接喚醒... */
if (!list_empty(&ep- >rdllist)) {
if (waitqueue_active(&ep- >wq))
wake_up_locked(&ep- >wq);
if (waitqueue_active(&ep- >poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep- >lock, flags);
mutex_unlock(&ep- >mtx);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep- >poll_wait);
return error;
}
-
服務(wù)器
+關(guān)注
關(guān)注
12文章
9203瀏覽量
85528 -
程序
+關(guān)注
關(guān)注
117文章
3788瀏覽量
81096 -
模型
+關(guān)注
關(guān)注
1文章
3254瀏覽量
48878 -
epoll
+關(guān)注
關(guān)注
0文章
28瀏覽量
2967
發(fā)布評論請先 登錄
相關(guān)推薦
評論