開發環境:
RT-Thread版本:4.0.3
操作系統:Windows10
Keil版本:V5.30
RT-Thread Studio版本:2.0.1
LWIP:2.0.2
3 Select/Poll概述
在LWIP中,如果要實現并發服務器,可以基于Sequentaial API來實現,這種方式需要使用多線程,也就是為每個連接創建一個線程來處理數據。而在資源受限的嵌入式設備來說,如果為每個連接都創建一個線程,這種資源的消耗是巨大的,因此,我們需要換一種實現思路,也就是使用IO多路復用的機制來實現,也就是select機制。
Select/Poll則是POSIX所規定,一般操作系統或協議棧均有實現。
值得注意的是,poll和select都是基于內核函數sys_poll實現的,不同在于在Linux系統中select是從BSD Unix系統繼承而來,poll則是從System V Unix系統繼承而來,因此兩種方式相差不大。poll函數沒有最大文件描述符數量的限制。poll和 select與一樣,大量文件描述符的數組被整體復制于用戶和內核的地址空間之間,開銷隨著文件描述符數量的增加而線性增大。
3.1 Select函數
在BSD Socket中,select函數原型如下:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval *timeout);
【參數說明】
- nfds:select監視的文件句柄數,一般設為要監視各文件中的最大文件描述符值加1。
- readfds:文件描述符集合監視文件集中的任何文件是否有數據可讀,當select函數返回的時候,readfds將清除其中不可讀的文件描述符,只留下可讀的文件描述符。
- writefds:文件描述符集合監視文件集中的任何文件是否有數據可寫,當select函數返回的時候,writefds將清除其中不可寫的文件描述符,只留下可寫的文件描述符。
- exceptfds:文件集將監視文件集中的任何文件是否發生錯誤,可用于其他的用途,例如,監視帶外數據OOB,帶外數據使用MSG_OOB標志發送到套接字上。當select函數返回的時候,exceptfds將清除其中的其他文件描述符,只留下標記有OOB數據的文件描述符。
- timeout參數是一個指向 struct timeval類型的指針,它可以使 select()在等待 timeout時間后若沒有文件描述符準備好則返回。其timeval結構用于指定這段時間的秒數和微秒數。它可以使select處于三種狀態:
(1)若將NULL以形參傳入,即不傳入時間結構,就是將select置于阻塞狀態,一定等到監視文件描述符集合中某個文件描述符發生變化為止;
(2)若將時間值設為0秒0毫秒,就變成一個純粹的非阻塞函數,不管文件描述符是否有變化,都立刻返回繼續執行,文件無變化返回0,有變化返回一個正值;
(3) timeout的值大于0,這就是等待的超時時間,即select在timeout時間內阻塞,超時時間之內有事件到來就返回了,否則在超時后不管怎樣一定返回,返回值同上述。
timeval結構體定義
struct timeval
{
int tv_sec;/*秒 */
int tv_usec;/*微妙 */
};
【返回值】
- int:若有就緒描述符返回其數目,若超時則為0,若出錯則為-1
下列操作用來設置、清除、判斷文件描述符集合。
FD_ZERO(fd_set *set);//清除一個文件描述符集。
FD_SET(int fd,fd_set *set);//將一個文件描述符加入文件描述符集中。
FD_CLR(int fd,fd_set *set);//將一個文件描述符從文件描述符集中清除。
FD_ISSET(int fd,fd_set *set);//判斷文件描述符是否被置位
fd_set可以理解為一個集合,這個集合中存放的是文件描述符(file descriptor),即文件句柄。中間的三個參數指定我們要讓內核測試讀、寫和異常條件的文件描述符集合。如果對某一個的條件不感興趣,就可以把它設為空指針。
select()的機制中提供一種fd_set的數據結構,實際上是一個long類型的數組,每一個數組元素都能與打開的文件句柄(不管是Socket句柄,還是其他文件或命名管道或設備句柄)建立聯系,建立聯系的工作由程序員完成,當調用select()時,由內核根據IO狀態修改fd_set的內容,由此來通知執行了select()的進程哪一Socket或文件可讀。
3.2 Poll函數
poll的函數原型:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
【參數說明】
- fds:fds是一個struct pollfd類型的數組,用于存放需要檢測其狀態的socket描述符,并且調用poll函數之后fds數組不會被清空;一個pollfd結構體表示一個被監視的文件描述符,通過傳遞fds指示 poll()監視多個文件描述符。
struct pollfd原型如下:
typedef struct pollfd {
int fd; //需要被檢測或選擇的文件描述符
short events; //對文件描述符fd上感興趣的事件
short revents; //文件描述符fd上當前實際發生的事件
} pollfd_t;
其中,結構體的events域是監視該文件描述符的事件掩碼,由用戶來設置這個域,結構體的revents域是文件描述符的操作結果事件掩碼,內核在調用返回時設置這個域。
- nfds:記錄數組fds中描述符的總數量。
- timeout:指定等待的毫秒數,無論 I/O是否準備好,poll()都會返回,和select函數是類似的。
【返回值】
- int:函數返回fds集合中就緒的讀、寫,或出錯的描述符數量,返回0表示超時,返回-1表示出錯;
poll改變了文件描述符集合的描述方式,使用了pollfd結構而不是select的fd_set結構,使得poll支持的文件描述符集合限制遠大于select的1024。這也是和select不同的地方。
4 LWIP的select/poll實現
好了,接下來看看LWIP是如何實現select/poll的。
4.1 lwip_select實現
目前LWIP已經完全實現select,它是基于信號量的機制來實現的,函數名是lwip_select。
LWIP實現Select的基本流程如下:
1.依次檢套接字集合中的每個套接字的事件表示,若有效,則記錄該套接字。
2.若存在一個或多事件,則返回,否則創建一個信號量并阻塞等待,記錄信號量的結構體是select_cb_list,是一個鏈表,在[sockets.c]文件中定義的:
static struct lwip_select_cb *select_cb_list;//管理select的鏈表
lwip_select_cb原型如下:
/** Description for a task waiting in select */
struct lwip_select_cb {
/** Pointer to the next waiting task */
struct lwip_select_cb *next;
/** Pointer to the previous waiting task */
struct lwip_select_cb *prev;
#if LWIP_SOCKET_SELECT
/** readset passed to select */
fd_set *readset;
/** writeset passed to select */
fd_set *writeset;
/** unimplemented: exceptset passed to select */
fd_set *exceptset;
#endif /* LWIP_SOCKET_SELECT */
#if LWIP_SOCKET_POLL
/** fds passed to poll; NULL if select */
struct pollfd *poll_fds;
/** nfds passed to poll; 0 if select */
nfds_t poll_nfds;
#endif /* LWIP_SOCKET_POLL */
/** don't signal the same semaphore twice: set to 1 when signalled */
int sem_signalled;//是否釋放信號領
/** semaphore to wake up a task waiting for select */
SELECT_SEM_T sem;//select阻塞的信號量
};
3.當套接字集合初始化,會向netconn結構注冊回調函數event_callback,當有是事件發生時,回調函數就被被執行,而且回調函數會遍歷select_cb_list,如果套接字在select_cb_list中,則select_cb_list釋放一個信號量。
好了,接下來看看LWIP的select具體實現,其原型如下:
int lwip_select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
struct timeval *timeout)
{
u32_t waitres = 0;//記錄select等待時間
int nready;
fd_set lreadset, lwriteset, lexceptset;//記錄發生事件的套接字
u32_t msectimeout;
int i;
int maxfdp2;
#if LWIP_NETCONN_SEM_PER_THREAD
int waited = 0;
#endif
#if LWIP_NETCONN_FULLDUPLEX
fd_set used_sockets;
#endif
SYS_ARCH_DECL_PROTECT(lev);
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select(%d, %p, %p, %p, tvsec=%"S32_F" tvusec=%"S32_F")\n",
maxfdp1, (void *)readset, (void *) writeset, (void *) exceptset,
timeout ? (s32_t)timeout->tv_sec : (s32_t) - 1,
timeout ? (s32_t)timeout->tv_usec : (s32_t) - 1));
if ((maxfdp1 < 0) || (maxfdp1 > LWIP_SELECT_MAXNFDS)) {
set_errno(EINVAL);
return -1;
}
lwip_select_inc_sockets_used(maxfdp1, readset, writeset, exceptset, &used_sockets);
/* Go through each socket in each list to count number of sockets which
currently match */
//檢測套接字集合中是否發生事件
nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
if (nready < 0) {
/* one of the sockets in one of the fd_sets was invalid */
set_errno(EBADF);
lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
return -1;
} else if (nready > 0) {
/* one or more sockets are set, no need to wait */
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready));
} else {
/* If we don't have any current events, then suspend if we are supposed to */
if (timeout && timeout->tv_sec == 0 && timeout->tv_usec == 0) {
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: no timeout, returning 0\n"));
/* This is OK as the local fdsets are empty and nready is zero,
or we would have returned earlier. */
} else {
/* None ready: add our semaphore to list:
We don't actually need any dynamic memory. Our entry on the
list is only valid while we are in this function, so it's ok
to use local variables (unless we're running in MPU compatible
mode). */
API_SELECT_CB_VAR_DECLARE(select_cb);
API_SELECT_CB_VAR_ALLOC(select_cb, set_errno(ENOMEM); lwip_select_dec_sockets_used(maxfdp1, &used_sockets); return -1);
memset(&API_SELECT_CB_VAR_REF(select_cb), 0, sizeof(struct lwip_select_cb));
API_SELECT_CB_VAR_REF(select_cb).readset = readset;
API_SELECT_CB_VAR_REF(select_cb).writeset = writeset;
API_SELECT_CB_VAR_REF(select_cb).exceptset = exceptset;
#if LWIP_NETCONN_SEM_PER_THREAD
API_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET();
#else /* LWIP_NETCONN_SEM_PER_THREAD */
if (sys_sem_new(&API_SELECT_CB_VAR_REF(select_cb).sem, 0) != ERR_OK) {
/* failed to create semaphore */
set_errno(ENOMEM);
lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
API_SELECT_CB_VAR_FREE(select_cb);
return -1;
}
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
lwip_link_select_cb(&API_SELECT_CB_VAR_REF(select_cb));
/* Increase select_waiting for each socket we are interested in */
maxfdp2 = maxfdp1;
for (i = LWIP_SOCKET_OFFSET; i < maxfdp1; i++) {
if ((readset && FD_ISSET(i, readset)) ||
(writeset && FD_ISSET(i, writeset)) ||
(exceptset && FD_ISSET(i, exceptset))) {
struct lwip_sock *sock;
SYS_ARCH_PROTECT(lev);
sock = tryget_socket_unconn_locked(i);
if (sock != NULL) {
sock->select_waiting++;//讀寫異常通知,并且socket是存在的,則會將select_wainting增加1
if (sock->select_waiting == 0) {
/* overflow - too many threads waiting */
sock->select_waiting--;
nready = -1;
maxfdp2 = i;
SYS_ARCH_UNPROTECT(lev);
done_socket(sock);
set_errno(EBUSY);
break;
}
SYS_ARCH_UNPROTECT(lev);
done_socket(sock);
} else {
/* Not a valid socket */
nready = -1;
maxfdp2 = i;
SYS_ARCH_UNPROTECT(lev);
set_errno(EBADF);
break;
}
}
}
if (nready >= 0) {
/* Call lwip_selscan again: there could have been events between
the last scan (without us on the list) and putting us on the list! */
//執行完上述操作,再次掃描一次是否有socket有事件產生
nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
if (!nready) {
/* Still none ready, just wait to be woken */
if (timeout == 0) {
/* Wait forever */
msectimeout = 0;
} else {
long msecs_long = ((timeout->tv_sec * 1000) + ((timeout->tv_usec + 500) / 1000));
if (msecs_long <= 0) {
/* Wait 1ms at least (0 means wait forever) */
msectimeout = 1;
} else {
msectimeout = (u32_t)msecs_long;
}
}
? //休眠指定時間,讓出cpu控制權
waitres = sys_arch_sem_wait(SELECT_SEM_PTR(API_SELECT_CB_VAR_REF(select_cb).sem), msectimeout);
#if LWIP_NETCONN_SEM_PER_THREAD
waited = 1;
#endif
}
}
?
/* Decrease select_waiting for each socket we are interested in */
for (i = LWIP_SOCKET_OFFSET; i < maxfdp2; i++) {
if ((readset && FD_ISSET(i, readset)) ||
(writeset && FD_ISSET(i, writeset)) ||
(exceptset && FD_ISSET(i, exceptset))) {
struct lwip_sock *sock;
SYS_ARCH_PROTECT(lev);
sock = tryget_socket_unconn_locked(i);
if (sock != NULL) {
/* for now, handle select_waiting==0... */
LWIP_ASSERT("sock->select_waiting > 0", sock->select_waiting > 0);
if (sock->select_waiting > 0) {
sock->select_waiting--;//休眠結束,將對應socket->select_waiting減1
}
SYS_ARCH_UNPROTECT(lev);
done_socket(sock);
} else {
SYS_ARCH_UNPROTECT(lev);
/* Not a valid socket */
nready = -1;
set_errno(EBADF);
}
}
}
lwip_unlink_select_cb(&API_SELECT_CB_VAR_REF(select_cb));
#if LWIP_NETCONN_SEM_PER_THREAD
if (API_SELECT_CB_VAR_REF(select_cb).sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) {
/* don't leave the thread-local semaphore signalled */
sys_arch_sem_wait(API_SELECT_CB_VAR_REF(select_cb).sem, 1);
}
#else /* LWIP_NETCONN_SEM_PER_THREAD */
sys_sem_free(&API_SELECT_CB_VAR_REF(select_cb).sem);
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
API_SELECT_CB_VAR_FREE(select_cb);
if (nready < 0) {
/* This happens when a socket got closed while waiting */
lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
return -1;
}
if (waitres == SYS_ARCH_TIMEOUT) {
/* Timeout */
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: timeout expired\n"));
/* This is OK as the local fdsets are empty and nready is zero,
or we would have returned earlier. */
} else {
/* See what's set now after waiting */
nready = lwip_selscan(maxfdp1, readset, writeset, exceptset, &lreadset, &lwriteset, &lexceptset);
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_select: nready=%d\n", nready));
}
}
}
lwip_select_dec_sockets_used(maxfdp1, &used_sockets);
set_errno(0);
if (readset) {
*readset = lreadset;
}
if (writeset) {
*writeset = lwriteset;
}
if (exceptset) {
*exceptset = lexceptset;
}
return nready;
}
以上代碼最核心的就是socket->select_waiting加1和減1的地方,當socket存在且的確需要監聽事件,且并不是進來事件就已經產生或者已經超時,一定會加1;然后線程會有可能會進行休眠;正常情況下,休眠結束后,socket->select_waiting減1,離開該函數,socket->select_waiting恢復原值。但是,如果在休眠期間進行了close(socket),則通過try_socket(socket)獲取不到socket結構體,則socket->select_waiting不會進行減1,后面執行一系列語句后,退出該函數,socket->select_waiting沒有恢復原值,且比進來時大1。針對該函數,socket->select_waiting加1的次數是>=減1的次數,所以如果只要在函數退出時沒有恢復原值,則socket->select_waiting永遠不可能再減為0了,此時socket資源就出現了假占用,該socket再也不能被其他人使用了。
lwip_select函數實現的具體流程如下:
Select的實現有個重要的結構體lwip_sock,其原型如下:
/** Contains all internal pointers and states used for a socket */
struct lwip_sock {
/** sockets currently are built on netconns, each socket has one netconn */
struct netconn *conn;
/** data that was left from the previous read */
union lwip_sock_lastdata lastdata;
#if LWIP_SOCKET_SELECT || LWIP_SOCKET_POLL
/** number of times data was received, set by event_callback(),
tested by the receive and select functions */
s16_t rcvevent;
/** number of times data was ACKed (free send buffer), set by event_callback(),
tested by select */
u16_t sendevent;
/** error happened for this socket, set by event_callback(), tested by select */
u16_t errevent;
/** counter of how many threads are waiting for this socket using select */
SELWAIT_T select_waiting;
#endif /* LWIP_SOCKET_SELECT || LWIP_SOCKET_POLL */
#if LWIP_NETCONN_FULLDUPLEX
/* counter of how many threads are using a struct lwip_sock (not the 'int') */
u8_t fd_used;
/* status of pending close/delete actions */
u8_t fd_free_pending;
#define LWIP_SOCK_FD_FREE_TCP 1
#define LWIP_SOCK_FD_FREE_FREE 2
#endif
#ifdef SAL_USING_POSIX
rt_wqueue_t wait_head;
#endif
};
在socket數據接收時,lwip_sock利用netconn相關的接收函數獲得一個pbuf(對于TCP)或者一個netbuf(對于UDP)數據,而這二者封裝的數據可能大于socket用戶指定的數據接收長度,因此在這種情況下,這兩個數據包需要暫時保存在socket中,以待用戶下一次讀取,這里lastdata就用于指向未被用戶完全讀取的數據包,而lastoffset則指向了未讀取的數據在數據包中的偏移。lwip_sock最后的五個字段是為select機制實現時使用的。
lwip_socket是上層Socket API中的實現,它對netconn結構的封裝和增強,描述一個具體連接。它基于內核netconn來實現所有邏輯,conn指向了與socket對應的netconn結構。Netconn原型如下:
/** A callback prototype to inform about events for a netconn */
typedef void (* netconn_callback)(struct netconn *, enum netconn_evt, u16_t len);
/** A netconn descriptor */
struct netconn {
/** type of the netconn (TCP, UDP or RAW) */
enum netconn_type type;
/** current state of the netconn */
enum netconn_state state;
/** the lwIP internal protocol control block */
union {
struct ip_pcb *ip;
struct tcp_pcb *tcp;
struct udp_pcb *udp;
struct raw_pcb *raw;
} pcb;
/** the last asynchronous unreported error this netconn had */
err_t pending_err;
#if !LWIP_NETCONN_SEM_PER_THREAD
/** sem that is used to synchronously execute functions in the core context */
sys_sem_t op_completed;
#endif
/** mbox where received packets are stored until they are fetched
by the netconn application thread (can grow quite big) */
sys_mbox_t recvmbox;
#if LWIP_TCP
/** mbox where new connections are stored until processed
by the application thread */
sys_mbox_t acceptmbox;
#endif /* LWIP_TCP */
#if LWIP_NETCONN_FULLDUPLEX
/** number of threads waiting on an mbox. This is required to unblock
all threads when closing while threads are waiting. */
int mbox_threads_waiting;
#endif
/** only used for socket layer */
#if LWIP_SOCKET
int socket;
#endif /* LWIP_SOCKET */
#if LWIP_SO_SNDTIMEO
/** timeout to wait for sending data (which means enqueueing data for sending
in internal buffers) in milliseconds */
s32_t send_timeout;
#endif /* LWIP_SO_RCVTIMEO */
#if LWIP_SO_RCVTIMEO
/** timeout in milliseconds to wait for new data to be received
(or connections to arrive for listening netconns) */
u32_t recv_timeout;
#endif /* LWIP_SO_RCVTIMEO */
#if LWIP_SO_RCVBUF
/** maximum amount of bytes queued in recvmbox
not used for TCP: adjust TCP_WND instead! */
int recv_bufsize;
/** number of bytes currently in recvmbox to be received,
tested against recv_bufsize to limit bytes on recvmbox
for UDP and RAW, used for FIONREAD */
int recv_avail;
#endif /* LWIP_SO_RCVBUF */
#if LWIP_SO_LINGER
/** values <0 mean linger is disabled, values > 0 are seconds to linger */
s16_t linger;
#endif /* LWIP_SO_LINGER */
/** flags holding more netconn-internal state, see NETCONN_FLAG_* defines */
u8_t flags;
#if LWIP_TCP
/** TCP: when data passed to netconn_write doesn't fit into the send buffer,
this temporarily stores the message.
Also used during connect and close. */
struct api_msg *current_msg;
#endif /* LWIP_TCP */
/** A callback function that is informed about events for this netconn */
netconn_callback callback;
};
前文已經提到,套接字集合初始化時,會向netconn結構注冊回調函數event_callback,這個回調函數就是結構體netconn中netconn_callback,接下來看看netconn_callback函數原型:
/**
* Callback registered in the netconn layer for each socket-netconn.
* Processes recvevent (data available) and wakes up tasks waiting for select.
*
* @note for LWIP_TCPIP_CORE_LOCKING any caller of this function
* must have the core lock held when signaling the following events
* as they might cause select_list_cb to be checked:
* NETCONN_EVT_RCVPLUS數據被內核接收則會產生該事件
* NETCONN_EVT_SENDPLUS數據成功發送則產生該事件
* NETCONN_EVT_ERROR連接錯誤則產生該事件
* This requirement will be asserted in select_check_waiters()
*/
static void
event_callback(struct netconn *conn, enum netconn_evt evt, u16_t len)
{
int s, check_waiters;
struct lwip_sock *sock;
SYS_ARCH_DECL_PROTECT(lev);
LWIP_UNUSED_ARG(len);
/* Get socket */
if (conn) {
s = conn->socket;
if (s < 0) {
/* Data comes in right away after an accept, even though
* the server task might not have created a new socket yet.
? * Just count down (or up) if that's the case and we
* will use the data later. Note that only receive events
* can happen before the new socket is set up. */
SYS_ARCH_PROTECT(lev);
if (conn->socket < 0) {
if (evt == NETCONN_EVT_RCVPLUS) {
/* conn->socket is -1 on initialization
lwip_accept adjusts sock->recvevent if conn->socket < -1 */
conn->socket--;
}
SYS_ARCH_UNPROTECT(lev);
return;
}
s = conn->socket;
SYS_ARCH_UNPROTECT(lev);
}
sock = get_socket(s);//獲取socket對應的結構
if (!sock) {
return;
}
} else {
return;
}
check_waiters = 1;
//進入臨界區,根據事件來更新socket的event值
SYS_ARCH_PROTECT(lev);
/* Set event as required */
switch (evt) {
case NETCONN_EVT_RCVPLUS://數據被內核收到
sock->rcvevent++;
if (sock->rcvevent > 1) {
check_waiters = 0;
}
break;
case NETCONN_EVT_RCVMINUS://數據被用戶讀取
sock->rcvevent--;
check_waiters = 0;
break;
case NETCONN_EVT_SENDPLUS://輸出發送成功
if (sock->sendevent) {
check_waiters = 0;
}
sock->sendevent = 1;
break;
case NETCONN_EVT_SENDMINUS://用戶寫入數據到緩沖區
sock->sendevent = 0;
check_waiters = 0;
break;
case NETCONN_EVT_ERROR://連接錯誤
sock->errevent = 1;
break;
default:
LWIP_ASSERT("unknown event", 0);
break;
}
//事件設置完畢,喚醒阻塞的select函數
if (sock->select_waiting && check_waiters) {
/* Save which events are active */
int has_recvevent, has_sendevent, has_errevent;
has_recvevent = sock->rcvevent > 0;//數據可讀事件
has_sendevent = sock->sendevent != 0;//數據可寫事件
has_errevent = sock->errevent != 0;//數據異常事件
SYS_ARCH_UNPROTECT(lev);
/* Check any select calls waiting on this socket */
select_check_waiters(s, has_recvevent, has_sendevent, has_errevent);
} else {
SYS_ARCH_UNPROTECT(lev);
}
done_socket(sock);
}
綜上,event_callback的本質就是readset、writeset、exceptset集合的監聽,并對rcvevent、sendevent、errevent的填寫,并阻塞的lwip_select函數發送信號量。而lwip_select的本質就是對rcvevent、sendevent、errevent的讀取,并執行相應的操作,lwip_select主要是通過lwip_selscan來掃描事件的。
4.2 lwip_poll實現
LWIP也完全實現poll,函數名是lwip_poll。lwip_poll和lwip_select的實現機制差不多,只是lwip_poll使用pollfd的結構來存儲描述符的,它是基于鏈表來存儲的,這樣lwip_poll函數沒有最大文件描述符數量的限制。lwip_poll函數原型如下:
int lwip_poll(struct pollfd *fds, nfds_t nfds, int timeout)
{
u32_t waitres = 0;
int nready;
u32_t msectimeout;
#if LWIP_NETCONN_SEM_PER_THREAD
int waited = 0;
#endif
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll(%p, %d, %d)\n",
(void*)fds, (int)nfds, timeout));
LWIP_ERROR("lwip_poll: invalid fds", ((fds != NULL && nfds > 0) || (fds == NULL && nfds == 0)),
set_errno(EINVAL); return -1;);
lwip_poll_inc_sockets_used(fds, nfds);
/* Go through each struct pollfd to count number of structures
which currently match */
nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_CLEAR);
if (nready < 0) {
lwip_poll_dec_sockets_used(fds, nfds);
return -1;
}
/* If we don't have any current events, then suspend if we are supposed to */
if (!nready) {
API_SELECT_CB_VAR_DECLARE(select_cb);
if (timeout == 0) {
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: no timeout, returning 0\n"));
goto return_success;
}
API_SELECT_CB_VAR_ALLOC(select_cb, set_errno(EAGAIN); lwip_poll_dec_sockets_used(fds, nfds); return -1);
memset(&API_SELECT_CB_VAR_REF(select_cb), 0, sizeof(struct lwip_select_cb));
/* None ready: add our semaphore to list:
We don't actually need any dynamic memory. Our entry on the
list is only valid while we are in this function, so it's ok
to use local variables. */
API_SELECT_CB_VAR_REF(select_cb).poll_fds = fds;
API_SELECT_CB_VAR_REF(select_cb).poll_nfds = nfds;
#if LWIP_NETCONN_SEM_PER_THREAD
API_SELECT_CB_VAR_REF(select_cb).sem = LWIP_NETCONN_THREAD_SEM_GET();
#else /* LWIP_NETCONN_SEM_PER_THREAD */
if (sys_sem_new(&API_SELECT_CB_VAR_REF(select_cb).sem, 0) != ERR_OK) {
/* failed to create semaphore */
set_errno(EAGAIN);
lwip_poll_dec_sockets_used(fds, nfds);
API_SELECT_CB_VAR_FREE(select_cb);
return -1;
}
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
lwip_link_select_cb(&API_SELECT_CB_VAR_REF(select_cb));
/* Increase select_waiting for each socket we are interested in.
Also, check for events again: there could have been events between
the last scan (without us on the list) and putting us on the list! */
nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_INC_WAIT);
if (!nready) {
/* Still none ready, just wait to be woken */
if (timeout < 0) {
/* Wait forever */
msectimeout = 0;
} else {
/* timeout == 0 would have been handled earlier. */
LWIP_ASSERT("timeout > 0", timeout > 0);
msectimeout = timeout;
}
waitres = sys_arch_sem_wait(SELECT_SEM_PTR(API_SELECT_CB_VAR_REF(select_cb).sem), msectimeout);
#if LWIP_NETCONN_SEM_PER_THREAD
waited = 1;
#endif
}
/* Decrease select_waiting for each socket we are interested in,
and check which events occurred while we waited. */
nready = lwip_pollscan(fds, nfds, LWIP_POLLSCAN_DEC_WAIT);
lwip_unlink_select_cb(&API_SELECT_CB_VAR_REF(select_cb));
#if LWIP_NETCONN_SEM_PER_THREAD
if (select_cb.sem_signalled && (!waited || (waitres == SYS_ARCH_TIMEOUT))) {
/* don't leave the thread-local semaphore signalled */
sys_arch_sem_wait(API_SELECT_CB_VAR_REF(select_cb).sem, 1);
}
#else /* LWIP_NETCONN_SEM_PER_THREAD */
sys_sem_free(&API_SELECT_CB_VAR_REF(select_cb).sem);
#endif /* LWIP_NETCONN_SEM_PER_THREAD */
API_SELECT_CB_VAR_FREE(select_cb);
if (nready < 0) {
/* This happens when a socket got closed while waiting */
lwip_poll_dec_sockets_used(fds, nfds);
return -1;
}
if (waitres == SYS_ARCH_TIMEOUT) {
? /* Timeout */
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: timeout expired\n"));
goto return_success;
}
}
LWIP_DEBUGF(SOCKETS_DEBUG, ("lwip_poll: nready=%d\n", nready));
return_success:
lwip_poll_dec_sockets_used(fds, nfds);
set_errno(0);
return nready;
}
和lwip_select一樣也是對事件進行掃描,只是掃描函數是lwip_pollscan而已。后面的內容就不在分析,有興趣請參看LWIP源碼。
lwip_poll函數實現的具體流程如下:
5并發服務器實現
前文講解了select/poll機制在LWIP的實現,接下來將使用select/poll來實現并發服務器。這里以select為例。
select并發服務器模型:
socket(...); //創建套接字
bind(...); //綁定
listen(...); //監聽
while(1)
{
if(select(...) > 0) //檢測監聽套接字是否可讀
{
if(FD_ISSET(...)>0) //套接字可讀,證明有新客戶端連接服務器
{
accpet(...);//取出已經完成的連接
process(...);//處理請求,反饋結果
}
}
close(...); //關閉連接套接字:accept()返回的套接字
}
因此,基于select實現的并發服務器模型如下:
從流程上來看,使用select函數進行IO請求和同步阻塞模型沒有太大的區別,甚至還多了添加監視socket,以及調用select函數的額外操作,效率更差。但是,使用select以后最大的優勢是用戶可以在一個線程內同時處理多個socket的IO請求。用戶可以注冊多個socket,然后不斷地調用select讀取被激活的socket,即可達到在同一個線程內同時處理多個IO請求的目的。而在同步阻塞模型中,必須通過多線程的方式才能達到這個目的。
Server:
/**
******************************************************************************
* @file server.c
* @author BruceOu
* @rtt version V4.0.3
* @version V1.0
* @date 2022-06-08
* @blog https://blog.bruceou.cn/
* @Official Accounts 嵌入式實驗樓
* @brief 基于select的服務器
******************************************************************************
*/
#include
#include
#include
#include
#include
#include
#define SERVER_PORT 8888
#define BUFF_SIZE 1024
static char recvbuff[BUFF_SIZE];
static void net_server_thread_entry(void *parameter)
{
int sfd, cfd, maxfd, i, nready, n;
struct sockaddr_in server_addr, client_addr;
struct netdev *netdev = RT_NULL;
char sendbuff[] = "Hello client!";
socklen_t client_addr_len;
fd_set all_set, read_set;
//FD_SETSIZE里面包含了服務器的fd
int clientfds[FD_SETSIZE - 1];
/*通過名稱獲取 netdev網卡對象 */
netdev = netdev_get_by_name((char*)parameter);
if (netdev == RT_NULL)
{
rt_kprintf("get network interface device(%s) failed.\n", (char*)parameter);
}
//創建socket
if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
rt_kprintf("Socket create failed.\n");
}
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
//server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
/*?獲取網卡對象中 IP?地址信息 */
server_addr.sin_addr.s_addr = netdev->ip_addr.addr;
//綁定socket
if (bind(sfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) < 0)
{
rt_kprintf("socket bind failed.\n");
closesocket(sfd);
}
rt_kprintf("socket bind network interface device(%s) success!\n", netdev->name);
//監聽socket
if(listen(sfd, 5) == -1)
{
rt_kprintf("listen error");
}
else
{
rt_kprintf("listening...\n");
}
client_addr_len = sizeof(client_addr);
//初始化 maxfd等于 sfd
maxfd = sfd;
//清空fdset
FD_ZERO(&all_set);
//把sfd文件描述符添加到集合中
FD_SET(sfd, &all_set);
//初始化客戶端fd的集合
for(i = 0; i < FD_SETSIZE -1 ; i++)
{
//初始化為-1
clientfds[i] = -1;
}
while(1)
{
//每次select返回之后,fd_set集合就會變化,再select時,就不能使用,
//所以我們要保存設置fd_set?和?讀取的fd_set
read_set = all_set;
nready = select(maxfd + 1, &read_set, NULL, NULL, NULL);
//沒有超時機制,不會返回0
if(nready < 0)
{
? rt_kprintf("select error \r\n");
}
//判斷監聽的套接字是否有數據
if(FD_ISSET(sfd, &read_set))
{
//有客戶端進行連接了
cfd = accept(sfd, (struct sockaddr *)&client_addr, &client_addr_len);
if(cfd < 0)
{
rt_kprintf("accept socket error\r\n");
//繼續select
continue;
}
rt_kprintf("new client connect fd = %d\r\n", cfd);
//把新的cfd?添加到fd_set集合中
FD_SET(cfd, &all_set);
//更新要select的maxfd
maxfd = (cfd > maxfd)?cfd:maxfd;
//把新的cfd保存到cfds集合中
for(i = 0; i < FD_SETSIZE -1 ; i++)
{
if(clientfds[i] == -1)
{
clientfds[i] = cfd;
//退出,不需要添加
break;
}
}
//沒有其他套接字需要處理:這里防止重復工作,就不去執行其他任務
if(--nready == 0)
{
//繼續select
continue;
}
}
//遍歷所有的客戶端文件描述符
for(i = 0; i < FD_SETSIZE -1 ; i++)
{
if(clientfds[i] == -1)
{
//繼續遍歷
continue;
}
//判斷是否在fd_set集合里面
? if(FD_ISSET(clientfds[i], &read_set))
{
n = recv(clientfds[i], recvbuff, sizeof(recvbuff), 0);
rt_kprintf("clientfd %d:? %s \r\n",clientfds[i], recvbuff);
if(n <= 0)
{
//從集合里面清除
FD_CLR(clientfds[i], &all_set);
//當前的客戶端fd?賦值為-1
clientfds[i] = -1;? }
else
{
//寫回客戶端
n = send(clientfds[i], sendbuff, strlen(sendbuff), 0);
if(n < 0)
{
//從集合里面清除
FD_CLR(clientfds[i], &all_set);
//當前的客戶端fd?賦值為-1
? clientfds[i] = -1;
}
}
}
}
}
}
static int server(int argc, char **argv)
{
rt_err_t ret = RT_EOK;
if (argc != 2)
{
rt_kprintf("bind_test [netdev_name]? --bind network interface device by name.\n");
return -RT_ERROR;
}
/*?創建 serial?線程 */
rt_thread_t thread = rt_thread_create("server",
net_server_thread_entry,
argv[1],
4096,
10,
10);
/*?創建成功則啟動線程 */
if (thread != RT_NULL)
{
rt_thread_startup(thread);
}
else
{
? ret = RT_ERROR;
}
return ret;
}
#ifdef FINSH_USING_MSH
#include
MSH_CMD_EXPORT(server, network interface device test);
#endif /* FINSH_USING_MSH */
Client:【Linux版】
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define SERVPORT 8888
int main(int argc,char *argv[])
{
char sendbuf[] = "Client1 : Hello Rtthread!";
char recvbuf[2014];
int sockfd,sendbytes;
struct sockaddr_in serv_addr;//需要連接的服務器地址信息
if (argc != 2)
{
perror("init error");
}
//1.創建socket
//AF_INET表示IPV4
//SOCK_STREAM表示TCP
if((sockfd = socket(AF_INET,SOCK_STREAM,0)) < 0)
{
perror("socket");
exit(1);
}
//填充服務器地址信息
serv_addr.sin_family ???? = AF_INET; //網絡層的IP協議: IPV4
serv_addr.sin_port ??????? = htons(SERVPORT); //傳輸層的端口號
serv_addr.sin_addr.s_addr? = inet_addr(argv[1]); //網絡層的IP地址:?實際的服務器IP地址
?bzero(&(serv_addr.sin_zero),8); //保留的8字節置零
//2.發起對服務器的連接信息
//三次握手,需要將sockaddr_in類型的數據結構強制轉換為sockaddr
if((connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(struct sockaddr))) < 0) {
perror("connect failed!");
exit(1);
}
printf("connect successful! \n");
//3.發送消息給服務器端
while (1)
{
send(sockfd, sendbuf, strlen(sendbuf), 0);
recv(sockfd, recvbuf, sizeof(recvbuf), 0);
????? printf("Server : %s \n", recvbuf);
????? sleep(2);
}
//4.關閉
close(sockfd);
}
Client:【RT-Thread版】
/**
******************************************************************************
* @file client.c
* @author BruceOu
* @rtt version V4.0.3
* @version V1.0
* @date 2022-06-01
* @blog https://blog.bruceou.cn/
* @Official Accounts 嵌入式實驗樓
* @brief 客戶端
******************************************************************************
*/
#include
#include
#include
#include
#include
#include
#define SERVER_HOST "192.168.101.8"
#define SERVER_PORT 8888
static int client(int argc, char **argv)
{
struct sockaddr_in client_addr;
struct sockaddr_in server_addr;
struct netdev *netdev = RT_NULL;
int sockfd = -1;
char sendbuf[] = "Hello RT-Thread! \r\n";
char recvbuf[2014];
if (argc != 2)
{
rt_kprintf("bind_test [netdev_name] --bind network interface device by name.\n");
return -RT_ERROR;
}
/*通過名稱獲取 netdev網卡對象 */
netdev = netdev_get_by_name(argv[1]);
if (netdev == RT_NULL)
{
rt_kprintf("get network interface device(%s) failed.\n", argv[1]);
return -RT_ERROR;
}
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
rt_kprintf("Socket create failed.\n");
return -RT_ERROR;
}
/*?初始化需要綁定的客戶端地址 */
client_addr.sin_family = AF_INET;
client_addr.sin_port = htons(8080);
/*?獲取網卡對象中 IP?地址信息 */
client_addr.sin_addr.s_addr = netdev->ip_addr.addr;
rt_memset(&(client_addr.sin_zero), 0, sizeof(client_addr.sin_zero));
if (bind(sockfd, (struct sockaddr *)&client_addr, sizeof(struct sockaddr)) < 0)
{
rt_kprintf("socket bind failed.\n");
closesocket(sockfd);
return -RT_ERROR;
}
rt_kprintf("socket bind network interface device(%s) success!\n", netdev->name);
/*初始化預連接的服務端地址 */
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = inet_addr(SERVER_HOST);
rt_memset(&(server_addr.sin_zero), 0, sizeof(server_addr.sin_zero));
/*連接到服務端 */
if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) < 0)
{
rt_kprintf("socket connect failed!\n");
closesocket(sockfd);
return -RT_ERROR;
}
else
{
rt_kprintf("socket connect success!\n");
}
while (1)
{
send(sockfd, sendbuf, strlen(sendbuf), 0);
recv(sockfd, recvbuf, sizeof(recvbuf), 0);
fputs(recvbuf, stdout);
memset(recvbuf, 0, sizeof(recvbuf));
rt_thread_mdelay(500);
}
/*?關閉連接 */
closesocket(sockfd);
return RT_EOK;
}
#ifdef FINSH_USING_MSH
#include
MSH_CMD_EXPORT(client, network interface device test);
#endif /* FINSH_USING_MSH */
接下來就是驗證了,關于ART-Pi的聯網部分就不再贅述了有不懂的看前面的章節。
現在ART-Pi上開啟服務器:
Server:
然后開啟客戶端,筆者的客戶端在Ubuntu上運行的:
Client:
筆者這里使用的客戶端只有兩個,有興趣的也可以使用多個客戶端。
當然啦,如果懶得寫客戶端,也可使用網絡調試助手測試。
-
服務器
+關注
關注
12文章
9123瀏覽量
85324 -
API
+關注
關注
2文章
1499瀏覽量
61962 -
RT-Thread
+關注
關注
31文章
1285瀏覽量
40081 -
select
+關注
關注
0文章
28瀏覽量
3912 -
ART-Pi
+關注
關注
0文章
23瀏覽量
1296
發布評論請先 登錄
相關推薦
評論