線程池的應用
在我認知中,任何網絡服務器都是一個死循環。這個死循環長下面這個樣子。
基本上服務器框架都是基于這個架構而不斷開發拓展的。
這個死循環總共分為四個步驟,可以涵蓋所有客戶端的需求,然而目前絕大多數企業不會用這樣的架構。
問題在于容易產生阻塞。
作為客戶端,我們當然希望訪問服務器的時候,能夠在短時間內收到回復,意味著自己連接上了該服務器。但是上述架構卻很容易產生響應延遲。
當某一個連接的2.3.4時間過長,也許是因為客戶上傳了很大的數據,也許是因為業務處理起來比較麻煩,需要計算很多東西,也許是客戶需要下載很大的東西,總之只要2,3,4的時間延遲,意味著下一個循環處理其它連接的動作也會被無限延遲。
也就是說,系統需要處理一個很慢的客戶端的連接,后面的所有連接,哪怕只是耗時很短的任務,都需要等這個很慢的任務完成才能進行。
所以除了像redis的服務器,數據庫都是基于hash的key-value結構,業務處理起來十分快速,才會將1,2,3,4都在一個線程中完成,其它的服務器若要提供千萬乃至億級別的客戶接入量,必須更快地處理客戶的連接,解除1和234之間的耦合,這才引入了多線程,我的主線程只負責1,然后將2,3,4分發到其它線程中執行。
然而,如果服務器選擇這種多線程架構,當我們面臨著巨大的客戶端流量,則勢必需要頻繁地創建和銷毀線程,這個過程十分浪費系統資源,還容易造成系統崩潰,然后老板震驚,被迫畢業,流落街頭,思之令人發笑。
解決辦法就是線程池。
我們預先創建好一系列線程,就好比后宮佳麗三千,然后皇上(線程池中樞)來了興致(收到任務),就去翻一個妃子(線程池中某個線程)的牌子。妃子(線程)解決完需求后,回到后宮(線程池),等待下一次召喚。
不用創建和銷毀,而是回收利用,所有池式結構都可以看做是一種對資源調度的緩沖,這就是線程池的精髓。
線程池設計
我們手撕線程池,目的還是搞懂基本原理,不弄太多花里胡哨的架構,比如工廠模式之類的。
當前這個版本的線程池是基于互斥鎖和條件變量實現的。
預告(畫餅):無鎖線程池后續也會手撕。
線程池總體上可以分為三大組件。
- 任務隊列(存還沒有執行的任務)
- 執行隊列(可以看成就是線程池,存放著可以用來執行任務的線程)
- 線程池管理中樞(負責封裝前兩個類,任務的分發,線程池的創建,銷毀,等等。對外提供統一的接口)
其工作流程大概如圖所示
任務隊列節點數據結構
class TaskEle{
public:
void (*taskCallback)(void *arg);
string user_data;
void setFunc(void (*tcb)(void *arg)){
taskCallback = tcb;
}
};
任務隊列負責存還沒有執行的業務,我們可以將每個業務都抽象成一個函數,每個函數自然有可能需要參數。
所以任務隊列的節點需要兩個成員:
- taskCallback:函數回調,執行客戶端想要的業務。
- user_data:函數參數,包含客戶端的信息,比如socketfd等。
順便提供了一個接口,可以修改回調函數。
執行隊列節點數據結構
class ExecEle{
public:
pthread_t tid;
bool usable = true;
ThreadPool* pool;
static void* start(void* arg);
};
- tid:每個節點都對應一個線程,所以需要一個id成員來存線程id,
- usable:這個成員非常妙,它代表當前線程是否可用,默認為true,一旦設置為false,則該線程會結束。
- 使用usable可以在最后銷毀線程池的時候,以一種優雅的方式結束每個線程,而代替pthread_cancel這種強制銷毀線程的方式,因為你不知道線程中的任務是否處理完,強制銷毀就會使某些業務中斷。
- pool:這個成員是指向中樞管理(后面會講)的指針,主要是為了在每個線程中通過pool獲取到一個全局(對于所有線程池線程共享)的互斥鎖和條件變量。
- start:是線程池對象執行的一個實現線程再回收利用的任務循環。具體實現代碼也是在后面會講。
線程池管理中樞設計
總體結構:
public:
//-任務隊列和執行隊列
deque task_queue;
deque exec_queue;
//-條件變量
pthread_cond_t cont;
//-互斥鎖
pthread_mutex_t mutex;
//-線程池大小
int thread_count;
//-構造函數
ThreadPool(int thread_count):thread_count(thread_count);
//-創建線程池
void createPool();
//-加入任務
void push_task(void(*tcb)(void* arg),int i);
//-利用析構銷毀線程池
~ThreadPool();
};*>*>
關于數據成員:
- task_queue、exec_queue: 任務隊列和執行隊列,我使用deque作為容器實現隊列。
- cont:所有線程共享的條件變量。
- mutex:所有線程共享的互斥鎖。
- thread_count: 線程池創建的時候,初始大小
關于成員方法:
- ThreadPool:構造函數
- createPool:創建線程池
- push_task: 給服務器主循環用的,給線程池添加任務。
- ~ ThreadPool: 銷毀線程池,事實上應該單獨定義一個destroyPool的api,我這里為了簡便合并到析構中了。
ExecEle的start函數實現
現在對于ThreadPool對象有概念以后,可以先將剛剛執行隊列節點ExecEle的start函數實現,其代表了每個線程池的線程始終在跑的循環,在無任務分配的時候阻塞在某個位置。
//-獲得執行對象
ExecEle *ee = (ExecEle*)arg;
while(true){
//-加鎖
pthread_mutex_lock(&(ee->pool->mutex));
while(ee->pool->task_queue.empty()){//-如果任務隊列為空,等待新任務
if(!ee->usable){
break;
}
pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
}
if(!ee -> usable){
pthread_mutex_unlock(&ee -> pool -> mutex);
break;
}
TaskEle *te = ee->pool->task_queue.front();
ee->pool->task_queue.pop_front();
//-解鎖
pthread_mutex_unlock(&(ee->pool -> mutex));
//-執行任務回調
te->user_data+=to_string(pthread_self());
te->taskCallback(te);
}
//-刪除線程執行對象
delete ee;
fprintf(stdout,"destroy thread %dn",pthread_self());
return NULL;
}
arg參數指向的是該線程函數對應的執行元素ExecEle本身的指針,我們定義其為ee。然后進入死循環,通過ee,我們可以獲得線程池中樞對象pool。
通過pool。我們可以獲得任務隊列的情況,當任務隊列為空,則線程進入阻塞狀態,等待任務隊列有任務進來后,通過條件變量通知,再恢復執行。
恢復執行后,從任務隊列中取出隊首的任務,這個過程需要在mutex的范圍內,保證獨占性。
之后解除互斥鎖,開始執行任務的回調。執行完進行入下個循環,嘗試再次獲得互斥鎖。
最后說一說usable,當我們銷毀線程池的時候,設置每一個線程的usable為false,那么不會立刻中斷每個線程正在執行的回調,而是等回調結束后,在下一次循環中如果檢測到usable為false后,就會退出整個大循環,并釋放自己的鎖,喚醒線程池其它休眠的線程。退出大循環后,線程自然而優雅地結束。
之后是ThreadPool自己的api實現
構造函數ThreadPool實現:
//-初始化條件變量和互斥鎖
pthread_cond_init(&cont,NULL);
pthread_mutex_init(&mutex,NULL);
}
主要為了初始化cont,mutex,thread_count。
創建線程池createPool實現:
int ret;
//-初始執行隊列
for(int i = 0;i ExecEle *ee = new ExecEle;
ee->pool = const_cast(this);
if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
delete ee;
ERR_EXIT_THREAD(ret,"pthread_create");
}
fprintf(stdout,"create thread %dn",i);
exec_queue.push_back(ee);
}
fprintf(stdout,"create pool finish...n");
}*>;++i){
通過pthread_create創建thread_count個線程,每個線程執行自己的start函數進入等待任務循環,并阻塞在鎖和條件變量的位置。將exec對象push進執行隊列。
添加任務 push_task實現:
TaskEle *te = new TaskEle;
te->setFunc(tcb);
te->user_data = "Task "+to_string(i)+" run in thread ";
//-加鎖
pthread_mutex_lock(&mutex);
task_queue.push_back(te);
//-通知執行隊列中的一個進行任務
pthread_cond_signal(&cont);
//-解鎖
pthread_mutex_unlock(&mutex);
}
主要功能是構造TaskEle對象并加入到執行隊列中。
每個TaskEle可能需要執行不同的業務,所以push_task需要傳入對應業務的回調tcb(task callback)
i是我加的額外參數,代表主線程中連接的客戶端編號,其意義可以是socketfd。
注意在修改執行隊列(push)的時候,需要加鎖保證獨占。
銷毀線程池~ ThreadPool 實現:
for(int i = 0;i exec_queue[i]->usable = false;
}
pthread_mutex_lock(&mutex);
//-清空任務隊列
task_queue.clear();
//-廣播給每個執行線程令其退出(執行線程破開循環會free掉堆內存)
pthread_cond_broadcast(&cont);
pthread_mutex_unlock(&mutex);//-讓其他線程拿到鎖
//-等待所有線程退出
for(int i = 0;i pthread_join(exec_queue[i] -> tid,NULL);
}
//-清空執行隊列
exec_queue.clear();
//-銷毀鎖和條件變量
pthread_cond_destroy(&cont);
pthread_mutex_destroy(&mutex);
}();>();++i){
先將所有線程的usable設置為false,之后加鎖,清空任務隊列,并通過條件變量通知所有線程,等所有線程退出后,銷毀執行隊列,銷毀鎖和條件變量。
業務代碼和服務器主循環
void execFunc(void* arg){
TaskEle *te =(TaskEle*)arg;
fprintf(stdout, "%sn",te->user_data.c_str());
};
int main(){
//-創建線程池
ThreadPool pool(100);
pool.createPool();
//-創建任務
for(int i =0;i<1000;++i){
pool.push_task(&execFunc,i);
}
exit(EXIT_SUCCESS);
}
隨便寫的線程執行的業務,打印一下客戶信息。
主線程創建100大小的線程池,并添加1000個任務(連接)。
完整代碼
//-三個組件:任務隊列,執行隊列,線程池(中樞管理)
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
//-打印線程錯誤專用,根據err來識別錯誤信息
static inline void ERR_EXIT_THREAD(int err, const char * msg){
fprintf(stderr,"%s:%sn",strerror(err),msg);
exit(EXIT_FAILURE);
}
class ThreadPool;//-聲明
//- 任務隊列元素
class TaskEle{
public:
void (*taskCallback)(void *arg);
string user_data;
void setFunc(void (*tcb)(void *arg)){
taskCallback = tcb;
}
};
//-執行隊列元素
class ExecEle{
public:
pthread_t tid;
bool usable = true;
ThreadPool* pool;
static void* start(void* arg);
};
//-線程池
class ThreadPool{
public:
//-任務隊列和執行隊列
deque task_queue;
deque exec_queue;
//-條件變量
pthread_cond_t cont;
//-互斥鎖
pthread_mutex_t mutex;
//-線程池大小
int thread_count;
//-構造函數
ThreadPool(int thread_count):thread_count(thread_count){
//-初始化條件變量和互斥鎖
pthread_cond_init(&cont,NULL);
pthread_mutex_init(&mutex,NULL);
}
void createPool(){
int ret;
//-初始執行隊列
for(int i = 0;i ExecEle *ee = new ExecEle;
ee->pool = const_cast(this);
if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
delete ee;
ERR_EXIT_THREAD(ret,"pthread_create");
}
fprintf(stdout,"create thread %dn",i);
exec_queue.push_back(ee);
}
fprintf(stdout,"create pool finish...n");
}
//-加入任務
void push_task(void(*tcb)(void* arg),int i){
TaskEle *te = new TaskEle;
te->setFunc(tcb);
te->user_data = "Task "+to_string(i)+" run in thread ";
//-加鎖
pthread_mutex_lock(&mutex);
task_queue.push_back(te);
//-通知執行隊列中的一個進行任務
pthread_cond_signal(&cont);
//-解鎖
pthread_mutex_unlock(&mutex);
}
//-銷毀線程池
~ThreadPool() {
for(int i = 0;i exec_queue[i]->usable = false;
}
pthread_mutex_lock(&mutex);
//-清空任務隊列
task_queue.clear();
//-廣播給每個執行線程令其退出(執行線程破開循環會free掉堆內存)
pthread_cond_broadcast(&cont);
pthread_mutex_unlock(&mutex);//-讓其他線程拿到鎖
//-等待所有線程退出
for(int i = 0;i pthread_join(exec_queue[i] -> tid,NULL);
}
//-清空執行隊列
exec_queue.clear();
//-銷毀鎖和條件變量
pthread_cond_destroy(&cont);
pthread_mutex_destroy(&mutex);
}
};
void* ExecEle::start(void*arg){
//-獲得執行對象
ExecEle *ee = (ExecEle*)arg;
while(true){
//-加鎖
pthread_mutex_lock(&(ee->pool->mutex));
while(ee->pool->task_queue.empty()){//-如果任務隊列為空,等待新任務
if(!ee->usable){
break;
}
pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
}
if(!ee -> usable){
pthread_mutex_unlock(&ee -> pool -> mutex);
break;
}
TaskEle *te = ee->pool->task_queue.front();
ee->pool->task_queue.pop_front();
//-解鎖
pthread_mutex_unlock(&(ee->pool -> mutex));
//-執行任務回調
te->user_data+=to_string(pthread_self());
te->taskCallback(te);
}
//-刪除線程執行對象
delete ee;
fprintf(stdout,"destroy thread %dn",pthread_self());
return NULL;
}
//-線程執行的業務函數
void execFunc(void* arg){
TaskEle *te =(TaskEle*)arg;
fprintf(stdout, "%sn",te->user_data.c_str());
};
int main(){
//-創建線程池
ThreadPool pool(100);
pool.createPool();
//-創建任務
for(int i =0;i<1000;++i){
pool.push_task(&execFunc,i);
}
exit(EXIT_SUCCESS);
}();>();++i){
*>;++i){
*>*>
-
服務器
+關注
關注
12文章
9123瀏覽量
85328 -
數據庫
+關注
關注
7文章
3794瀏覽量
64362 -
線程池
+關注
關注
0文章
57瀏覽量
6844
發布評論請先 登錄
相關推薦
評論