本文主要分為三個部分:
一、線程簡介及使用
正確的使用線程是一個優秀程序員必備的素質。線程類似于進程,單處理器系統中內核通過時間片輪轉模擬線程的并發運行。那么,對于大多數合作任務,為什么多線程比多進程優越呢?
這是因為,線程共享相同的內存空間,不同線程之間可以共享內存中的全局變量。使用fork()寫過子進程的同學都會意識到多線程的重要性。為什么呢?雖然fork()允許創建多個進程,但它會帶來進程之間的通信問題:各個進程都有各自獨立的內存空間,需要使用某種IPC(進程間通信)進行通信,但它們都遇到兩個重要障礙:
- 強加了某種形式的額外內核開銷,從而降低性能。
- 對于大多數情形,IPC 不是對于代碼的“自然”擴展。通常極大地增加了程序的復雜性。
POSIX多線程不必使用諸如管道、套接字等長距離通信,這些通信方式開銷大、復雜,由于所有線程都駐留在同一內存空間,因此只需要考慮同步問題即可。
線程是快捷的
與標準fork()相比,線程開銷較少。無需單獨復制進程的內存空間或文件描述符等等,大大節省CPU時間,創建速度比進程創建快到10-100倍,那么可以大量使用線程而無需擔心CPU或內存不足。
同時,線程能夠充分利用多處理器的CPU,特定類型線程(CPU密集型)的性能隨處理器數目線性提高。如果編寫的是CPU密集型程序,則絕對要在代碼中使用多線程,無需使用繁瑣的IPC及其他復雜通信機制。
線程是可移植的
fork()的底層系統調用是__clone(),新的子進程根據該系統調用的參數有選擇的共享父進程的執行環境(內存空間,文件描述符等),但__clone()也有不好的一面,__clone()是特定于Linux平臺的,不使用于實現可移植程序。而Linux的POSIX線程是可移植的,代碼運行于Solaris、FreeBSD、Linux 和其它平臺。
代碼:
#include
#include
#include
void *thread_function(void *arg) {
int i;
for ( i=0; i<20; i++) {
printf("Thread says hi!n");
sleep(1);
}
return NULL;
}
int main(void) {
pthread_t mythread;
if ( pthread_create( &mythread, NULL, thread_function, NULL) ) {
printf("error creating thread.");
abort();
}
if ( pthread_join ( mythread, NULL ) ) {
printf("error joining thread.");
abort();
}
exit(0);
}
該程序非常簡單,但是也有我們需要學習的地方:
1)線程id的類型為pthread_t,可以認為它是一種句柄,后續的使用都是利用它完成的
2)線程創建函數需要依次指定線程屬性、回調函數以及線程傳參(簡單變量或結構體),返回值考慮
3)線程創建后兩個線程如何運行,子線程結束后如何處理
對于第三個問題:
子線程創建后,POSIX線程標準將它們視為相同等級的線程,子線程開始執行的同時,主線程繼續向下執行(其實這里已經沒有像進程那樣的父子概念了,這里只是為了更好的區分),二者并沒有一定的先后順序,CPU時間片的分配取決于內核和線程庫。
子線程結束時的處理,當子線程的默認joinable屬性時,由主線程對其進行清理;當子線程為detached屬性時,由系統進程對其清理。如果未對線程進行正確清理,最終會導致 pthread_create() 調用失敗。
代碼2:
#include
#include
#include
int myglobal;
void *thread_function(void *arg) {
int i,j;
for ( i=0; i<20; i++) {
j=myglobal;
j=j+1;
printf(".");
fflush(stdout);
sleep(1);
myglobal=j;
}
return NULL;
}
int main(void) {
pthread_t mythread;
int i;
if ( pthread_create( &mythread, NULL, thread_function, NULL) ) {
printf("error creating thread.");
abort();
}
for ( i=0; i<20; i++) {
myglobal=myglobal+1;
printf("o");
fflush(stdout);
sleep(1);
}
if ( pthread_join ( mythread, NULL ) ) {
printf("error joining thread.");
abort();
}
printf("nmyglobal equals %dn",myglobal);
exit(0);
}
輸出:
非常意外吧,主線程和子線程各自對myglobal進行20次加1操作,程序結束時myglobal應當為40,然而myglobal的輸出為21,這里面肯定有問題。究竟是為什么呢?
核心原因就是:對全局變量的修改并不是原子操作,假設子線程讀取全局變量到寄存器,寄存器內部完成加1,之后即將重新賦值給全局變量前的時刻。主線程開始讀取全局變量完成操作,那么此時就覆蓋了子線程的這一環節操作,該操作也就成了無效操作。
解決這一問題就需要互斥操作了,見第二部分。
二、互斥鎖及條件變量的使用
通過互斥鎖 (mutex)完成對臨界資源的鎖操作,能夠保證各個線程對其的唯一訪問。
互斥對象的工作原理:
如果線程 a 試圖鎖定一個互斥對象,而此時線程 b 已鎖定了同一個互斥對象時,線程 a 就將進入睡眠狀態。一旦線程 b 釋放了互斥對象(通過 pthread_mutex_unlock() 調用),線程 a 就能夠鎖定這個互斥對象(換句話說,線程 a 就將從 pthread_mutex_lock() 函數調用中返回,同時互斥對象被鎖定)。同樣地,其他對已鎖定的互斥對象上調用 pthread_mutex_lock() 的所有線程都將進入睡眠狀態,這些睡眠的線程將“排隊”訪問這個互斥對象。
看到了嗎?其他試圖訪問已被鎖定互斥對象的線程都會排隊睡眠的:)
代碼修改:
#include
#include
#include
int myglobal;
pthread_mutex_t mymutex=PTHREAD_MUTEX_INITIALIZER;
void *thread_function(void *arg) {
int i,j;
for ( i=0; i<20; i++) {
pthread_mutex_lock(&mymutex);
j=myglobal;
j=j+1;
printf(".");
fflush(stdout);
sleep(1);
myglobal=j;
pthread_mutex_unlock(&mymutex);
}
return NULL;
}
int main(void) {
pthread_t mythread;
int i;
if ( pthread_create( &mythread, NULL, thread_function, NULL) ) {
printf("error creating thread.");
abort();
}
for ( i=0; i<20; i++) {
pthread_mutex_lock(&mymutex);
myglobal=myglobal+1;
pthread_mutex_unlock(&mymutex);
printf("o");
fflush(stdout);
sleep(1);
}
if ( pthread_join ( mythread, NULL ) ) {
printf("error joining thread.");
abort();
}
printf("nmyglobal equals %dn",myglobal);
exit(0);
}
此時pthread_mutex_lock() 和 pthread_mutex_unlock() 函數調用,如同“在施工中”標志一樣,將正在修改和讀取的某一特定共享數據包圍起來。其他線程訪問時繼續睡眠,直到該線程完成對其的操作。
等待條件之POSIX條件變量
互斥對象是線程程序必需的工具,但它們并非萬能的。例如,如果線程正在等待共享數據內某個條件出現,那會發生什么呢?
1)使用忙查詢的方法非常浪費時間和資源,效率非常低。代碼可以反復對互斥對象鎖定和解鎖,以檢查值的任何變化。同時,還要快速將互斥對象解鎖,以便其它線程能夠進行任何必需的更改。這是一種非常可怕的方法,因為線程需要在合理的時間范圍內頻繁地循環檢測變化。
2)解決這個問題的最好方法是使用pthread_cond_wait() 調用來等待特殊條件發生。當線程在等待滿足某些條件時使線程進入睡眠狀態。一旦條件滿足,還需要一種方法以喚醒因等待滿足特定條件而睡眠的線程。如果能夠做到這一點,線程代碼將是非常高效的,并且不會占用寶貴的互斥對象鎖。這正是 POSIX 條件變量能做的事!
了解 pthread_cond_wait() 的作用非常重要 – 它是 POSIX 線程信號發送系統的核心,也是最難以理解的部分。
條件變量的概念
通常在程序里,我們使用條件變量來表示等待”某一條件”的發生。雖然名叫”條件變量”,但是它本身并不保存條件狀態,本質上條件變量僅僅是一種通訊機制:當有一個線程在等待(pthread_cond_wait)某一條件變量的時候,會將當前的線程掛起,直到另外的線程發送信號(pthread_cond_signal)通知其解除阻塞狀態。
由于要用額外的共享變量保存條件狀態(這個變量可以是任何類型比如bool),由于這個變量會同時被不同的線程訪問,因此需要一個額外的mutex保護它。
《Linux系統編程手冊》也有這個問題的介紹:
A condition variable is always used in conjunction with a mutex. The mutex provides mutual exclusion for accessing the shared variable, while the condition variable is used to signal changes in the variable’s state.
條件變量總是結合mutex使用,條件變量就共享變量的狀態改變發出通知,mutex就是用來保護這個共享變量的。
cpp官網描述
pthread_cond_wait實現步驟
首先,我們使用條件變量的接口實現一個簡單的生產者-消費者模型,avail就是保存條件狀態的共享變量,它對生產者線程、消費者線程均可見。不考慮錯誤處理,先看生產者實現:
avail++;
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond); /* Wake sleeping consumer */
因為avail對兩個線程都可見,因此對其修改均應該在mutex的保護之下,再來看消費者線程實現:
{
pthread_mutex_lock(&mutex);
while (avail == 0)
pthread_cond_wait(&cond, &mutex);
while (avail > 0)
{
/* Do something */
avail--;
}
pthread_mutex_unlock(&mutex);
}
當”avail==0”時,消費者線程會阻塞在pthread_cond_wait()函數上。如果pthread_cond_wait()僅需要一個pthread_cond_t參數的話,此時mutex已經被鎖,要是不先將mutex變量解鎖,那么其他線程(如生產者線程)永遠無法訪問avail變量,也就無法繼續生產,消費者線程會一直阻塞下去。
因此pthread_cond_wait()對mutex解鎖,然后進入睡眠狀態,等待cond以接收POSIX 線程“信號”。一旦接收到“信號”(加引號是因為我們并不是在討論傳統的 UNIX 信號,而是來自pthread_cond_signal() 或 pthread_cond_broadcast() 調用的信號),它就會蘇醒。但 pthread_cond_wait() 沒有立即返回 – 它還要做一件事:重新鎖定 mutex。
理解后提問:調用 pthread_cond_wait() 之 前,互斥對象必須處于什么狀態?pthread_cond_wait() 調用返回之后,互斥對象處于什么狀態?這兩個問題的答案都是“鎖定”。
綜上,pthread_cond_wait()函數大致會分為3個部分:
1.解鎖互斥量mutex 2.阻塞調用線程,直到當前的條件變量收到通知 3.重新鎖定互斥量mutex
其中1和2是原子操作,也就是說在pthread_cond_wait()調用線程陷入阻塞之前其他的線程無法獲取當前的mutex,也就不能就該條件變量發出通知。
虛假喚醒
前面判斷條件狀態的時候avail > 0放在了while循環中,而不是if中,這是因為pthread_cond_wait()阻塞在條件變量上的時候,即使其他的線程沒有就該條件變量發出通知(pthread_cond_signal()/pthread_cond_broadcast()),條件變量也有可能會自己醒來(pthread_cond_wait()函數返回),因此需要重新檢查一下共享變量上的條件成不成立,確保條件變量是真的收到了通知,否則繼續阻塞等待。關于虛假喚醒的相關介紹,可以戳這里查看維基百科下面的幾個引用:https://en.wikipedia.org/wiki/Spurious_wakeup。
三、工作隊列的實現
這里摘抄IBM第三部分應用實現代碼:
在這個方案中,我們創建了許多工作程序線程。每個線程都會檢查 wq(“工作隊列”),查看是否有需要完成的工作。如果有需要完成的工作,那么線程將從隊列中除去一個節點,執行這些特定工作,然后等待新的工作到達。
與此同時,主線程負責創建這些工作程序線程、將工作添加到隊列,然后在它退出時收集所有工作程序線程。您將會遇到許多 C 代碼,好好準備吧!
隊列
需要隊列是出于兩個原因。首先,需要隊列來保存工作作業。還需要可用于跟蹤已終止線程的數據結構。還記得前幾篇文章(請參閱本文結尾處的 參考資料)中,我曾提到過需要使用帶有特定進程標識的 pthread_join 嗎?使用“清除隊列”(稱作 “cq”)可以解決無法等待 任何已終止線程的問題(稍后將詳細討論這個問題)。以下是標準隊列代碼。將此代碼保存到文件 queue.h 和 queue.c:
queue.h
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
*/
typedef struct node {
struct node *next;
} node;
typedef struct queue {
node *head, *tail;
} queue;
void queue_init(queue *myroot);
void queue_put(queue *myroot, node *mynode);
node *queue_get(queue *myroot);
queue.c
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
**
** This set of queue functions was originally thread-aware. I
** redesigned the code to make this set of queue routines
** thread-ignorant (just a generic, boring yet very fast set of queue
** routines). Why the change? Because it makes more sense to have
** the thread support as an optional add-on. Consider a situation
** where you want to add 5 nodes to the queue. With the
** thread-enabled version, each call to queue_put() would
** automatically lock and unlock the queue mutex 5 times -- that's a
** lot of unnecessary overhead. However, by moving the thread stuff
** out of the queue routines, the caller can lock the mutex once at
** the beginning, then insert 5 items, and then unlock at the end.
** Moving the lock/unlock code out of the queue functions allows for
** optimizations that aren't possible otherwise. It also makes this
** code useful for non-threaded applications.
**
** We can easily thread-enable this data structure by using the
** data_control type defined in control.c and control.h. */
#include
#include "queue.h"
void queue_init(queue *myroot) {
myroot->head=NULL;
myroot->tail=NULL;
}
void queue_put(queue *myroot,node *mynode) {
mynode->next=NULL;
if (myroot->tail!=NULL)
myroot->tail->next=mynode;
myroot->tail=mynode;
if (myroot->head==NULL)
myroot->head=mynode;
}
node *queue_get(queue *myroot) {
//get from root
node *mynode;
mynode=myroot->head;
if (myroot->head!=NULL)
myroot->head=myroot->head->next;
return mynode;
}
data_control 代碼
我編寫的并不是線程安全的隊列例程,事實上我創建了一個“數據包裝”或“控制”結構,它可以是任何線程支持的數據結構。看一下 control.h:
typedef struct data_control {
pthread_mutex_t mutex;
pthread_cond_t cond;
int active;
} data_control;
現在您看到了 data_control 結構定義,以下是它的視覺表示:
所使用的 data_control 結構
圖像中的鎖代表互斥對象,它允許對數據結構進行互斥訪問。黃色的星代表條件變量,它可以睡眠,直到所討論的數據結構改變為止。on/off 開關表示整數 “active”,它告訴線程此數據是否是活動的。在代碼中,我使用整數 active 作為標志,告訴工作隊列何時應該關閉。以下是 control.c:
control.c
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
**
** These routines provide an easy way to make any type of
** data-structure thread-aware. Simply associate a data_control
** structure with the data structure (by creating a new struct, for
** example). Then, simply lock and unlock the mutex, or
** wait/signal/broadcast on the condition variable in the data_control
** structure as needed.
**
** data_control structs contain an int called "active". This int is
** intended to be used for a specific kind of multithreaded design,
** where each thread checks the state of "active" every time it locks
** the mutex. If active is 0, the thread knows that instead of doing
** its normal routine, it should stop itself. If active is 1, it
** should continue as normal. So, by setting active to 0, a
** controlling thread can easily inform a thread work crew to shut
** down instead of processing new jobs. Use the control_activate()
** and control_deactivate() functions, which will also broadcast on
** the data_control struct's condition variable, so that all threads
** stuck in pthread_cond_wait() will wake up, have an opportunity to
** notice the change, and then terminate.
*/
#include "control.h"
int control_init(data_control *mycontrol) {
int mystatus;
if (pthread_mutex_init(&(mycontrol->mutex),NULL))
return 1;
if (pthread_cond_init(&(mycontrol->cond),NULL))
return 1;
mycontrol->active=0;
return 0;
}
int control_destroy(data_control *mycontrol) {
int mystatus;
if (pthread_cond_destroy(&(mycontrol->cond)))
return 1;
if (pthread_cond_destroy(&(mycontrol->cond)))
return 1;
mycontrol->active=0;
return 0;
}
int control_activate(data_control *mycontrol) {
int mystatus;
if (pthread_mutex_lock(&(mycontrol->mutex)))
return 0;
mycontrol->active=1;
pthread_mutex_unlock(&(mycontrol->mutex));
pthread_cond_broadcast(&(mycontrol->cond));
return 1;
}
int control_deactivate(data_control *mycontrol) {
int mystatus;
if (pthread_mutex_lock(&(mycontrol->mutex)))
return 0;
mycontrol->active=0;
pthread_mutex_unlock(&(mycontrol->mutex));
pthread_cond_broadcast(&(mycontrol->cond));
return 1;
}
調試時間
在開始調試之前,還需要一個文件。以下是 dbug.h:
{ printf("Aborting at line %d in source file %sn",__LINE__,__FILE__); abort(); }
此代碼用于處理工作組代碼中的不可糾正錯誤。
工作組代碼說到工作組代碼,以下就是:workcrew.c
#include
#include "control.h"
#include "queue.h"
#include "dbug.h"
/* the work_queue holds tasks for the various threads to complete. */
struct work_queue {
data_control control;
queue work;
} wq;
/* I added a job number to the work node. Normally, the work node
would contain additional data that needed to be processed. */
typedef struct work_node {
struct node *next;
int jobnum;
} wnode;
/* the cleanup queue holds stopped threads. Before a thread
terminates, it adds itself to this list. Since the main thread is
waiting for changes in this list, it will then wake up and clean up
the newly terminated thread. */
struct cleanup_queue {
data_control control;
queue cleanup;
} cq;
/* I added a thread number (for debugging/instructional purposes) and
a thread id to the cleanup node. The cleanup node gets passed to
the new thread on startup, and just before the thread stops, it
attaches the cleanup node to the cleanup queue. The main thread
monitors the cleanup queue and is the one that performs the
necessary cleanup. */
typedef struct cleanup_node {
struct node *next;
int threadnum;
pthread_t tid;
} cnode;
void *threadfunc(void *myarg) {
wnode *mywork;
cnode *mynode;
mynode=(cnode *) myarg;
pthread_mutex_lock(&wq.control.mutex);
while (wq.control.active) {
while (wq.work.head==NULL && wq.control.active) {
pthread_cond_wait(&wq.control.cond, &wq.control.mutex);
}
if (!wq.control.active)
break;
//we got something!
mywork=(wnode *) queue_get(&wq.work);
pthread_mutex_unlock(&wq.control.mutex);
//perform processing...
printf("Thread number %d processing job %dn",mynode->threadnum,mywork->jobnum);
free(mywork);
pthread_mutex_lock(&wq.control.mutex);
}
pthread_mutex_unlock(&wq.control.mutex);
pthread_mutex_lock(&cq.control.mutex);
queue_put(&cq.cleanup,(node *) mynode);
pthread_mutex_unlock(&cq.control.mutex);
pthread_cond_signal(&cq.control.cond);
printf("thread %d shutting down...n",mynode->threadnum);
return NULL;
}
#define NUM_WORKERS 4
int numthreads;
void join_threads(void) {
cnode *curnode;
printf("joining threads...n");
while (numthreads) {
pthread_mutex_lock(&cq.control.mutex);
/* below, we sleep until there really is a new cleanup node. This
takes care of any false wakeups... even if we break out of
pthread_cond_wait(), we don't make any assumptions that the
condition we were waiting for is true. */
while (cq.cleanup.head==NULL) {
pthread_cond_wait(&cq.control.cond,&cq.control.mutex);
}
/* at this point, we hold the mutex and there is an item in the
list that we need to process. First, we remove the node from
the queue. Then, we call pthread_join() on the tid stored in
the node. When pthread_join() returns, we have cleaned up
after a thread. Only then do we free() the node, decrement the
number of additional threads we need to wait for and repeat the
entire process, if necessary */
curnode = (cnode *) queue_get(&cq.cleanup);
pthread_mutex_unlock(&cq.control.mutex);
pthread_join(curnode->tid,NULL);
printf("joined with thread %dn",curnode->threadnum);
free(curnode);
numthreads--;
}
}
int create_threads(void) {
int x;
cnode *curnode;
for (x=0; x curnode=malloc(sizeof(cnode));
if (!curnode)
return 1;
curnode->threadnum=x;
if (pthread_create(&curnode->tid, NULL, threadfunc, (void *) curnode))
return 1;
printf("created thread %dn",x);
numthreads++;
}
return 0;
}
void initialize_structs(void) {
numthreads=0;
if (control_init(&wq.control))
dabort();
queue_init(&wq.work);
if (control_init(&cq.control)) {
control_destroy(&wq.control);
dabort();
}
queue_init(&wq.work);
control_activate(&wq.control);
}
void cleanup_structs(void) {
control_destroy(&cq.control);
control_destroy(&wq.control);
}
int main(void) {
int x;
wnode *mywork;
initialize_structs();
/* CREATION */
if (create_threads()) {
printf("Error starting threads... cleaning up.n");
join_threads();
dabort();
}
pthread_mutex_lock(&wq.control.mutex);
for (x=0; x<16000; x++) {
mywork=malloc(sizeof(wnode));
if (!mywork) {
printf("ouch! can't malloc!n");
break;
}
mywork->jobnum=x;
queue_put(&wq.work,(node *) mywork);
}
pthread_mutex_unlock(&wq.control.mutex);
pthread_cond_broadcast(&wq.control.cond);
printf("sleeping...n");
sleep(2);
printf("deactivating work queue...n");
control_deactivate(&wq.control);
/* CLEANUP */
join_threads();
cleanup_structs();
};>
代碼初排
現在來快速初排代碼。定義的第一個結構稱作 “wq”,它包含了 data_control 和隊列頭。data_control 結構用于仲裁對整個隊列的訪問,包括隊列中的節點。下一步工作是定義實際的工作節點。要使代碼符合本文中的示例,此處所包含的都是作業號。
接著,創建清除隊列。注釋說明了它的工作方式。好,現在讓我們跳過 threadfunc()、join_threads()、create_threads() 和 initialize_structs() 調用,直接跳到 main()。所做的第一件事就是初始化結構 – 這包括初始化 data_controls 和隊列,以及激活工作隊列。
有關清除的注意事項
現在初始化線程。如果看一下 create_threads() 調用,似乎一切正常 – 除了一件事。請注意,我們正在分配清除節點,以及初始化它的線程號和 TID 組件。我們還將清除節點作為初始自變量傳遞給每一個新的工作程序線程。為什么這樣做?
因為當某個工作程序線程退出時,它會將其清除節點連接到清除隊列,然后終止。那時,主線程會在清除隊列中檢測到這個節點(利用條件變量),并將這個節點移出隊列。因為 TID(線程標識)存儲在清除節點中,所以主線程可以確切知道哪個線程已終止了。然后,主線程將調用 pthread_join(tid),并聯接適當的工作程序線程。如果沒有做記錄,那么主線程就需要按任意順序聯接工作程序線程,可能是按它們的創建順序。由于線程不一定按此順序終止,那么主線程可能會在已經聯接了十個線程時,等待聯接另一個線程。您能理解這種設計決策是如何使關閉代碼加速的嗎(尤其在使用幾百個工作程序線程的情況下)?
創建工作
我們已啟動了工作程序線程(它們已經完成了執行 threadfunc(),稍后將討論此函數),現在主線程開始將工作節點插入工作隊列。首先,它鎖定 wq 的控制互斥對象,然后分配 16000 個工作包,將它們逐個插入隊列。完成之后,將調用 pthread_cond_broadcast(),于是所有正在睡眠的線程會被喚醒,并開始執行工作。此時,主線程將睡眠兩秒鐘,然后釋放工作隊列,并通知工作程序線程終止活動。接著,主線程會調用 join_threads() 函數來清除所有工作程序線程。
threadfunc()
現在來討論 threadfunc(),這是所有工作程序線程都要執行的代碼。當工作程序線程啟動時,它會立即鎖定工作隊列互斥對象,獲取一個工作節點(如果有的話),然后對它進行處理。如果沒有工作,則調用 pthread_cond_wait()。您會注意到這個調用在一個非常緊湊的 while() 循環中,這是非常重要的。當從 pthread_cond_wait() 調用中蘇醒時,決不能認為條件肯定發生了 – 它 可能發生了,也可能沒有發生。如果發生了這種情況,即錯誤地喚醒了線程,而列表是空的,那么 while 循環將再次調用 pthread_cond_wait()。
如果有一個工作節點,那么我們只打印它的作業號,釋放它并退出。然而,實際代碼會執行一些更實質性的操作。在 while() 循環結尾,我們鎖定了互斥對象,以便檢查 active 變量,以及在循環頂部檢查新的工作節點。如果執行完此代碼,就會發現如果 wq.control.active 是 0,while 循環就會終止,并會執行 threadfunc() 結尾處的清除代碼。
工作程序線程的清除代碼部件非常有趣。首先,由于 pthread_cond_wait() 返回了鎖定的互斥對象,它會對 work_queue 解鎖。然后,它鎖定清除隊列,添加清除代碼(包含了 TID,主線程將使用此 TID 來調用 pthread_join()),然后再對清除隊列解鎖。此后,它發信號給所有 cq 等待者 (pthread_cond_signal(&cq.control.cond)),于是主線程就知道有一個待處理的新節點。我們不使用 pthread_cond_broadcast(),因為沒有這個必要 – 只有一個線程(主線程)在等待清除隊列中的新節點。當它調用 join_threads() 時,工作程序線程將打印關閉消息,然后終止,等待主線程發出的 pthread_join() 調用。
join_threads()
如果要查看關于如何使用條件變量的簡單示例,請參考 join_threads() 函數。如果還有工作程序線程,join_threads() 會一直執行,等待清除隊列中新的清除節點。如果有新節點,我們會將此節點移出隊列、對清除隊列解鎖(從而使工作程序可以添加清除節點)、聯接新的工作程序線程(使用存儲在清除節點中的 TID)、釋放清除節點、減少“現有”線程的數量,然后繼續。
-
cpu
+關注
關注
68文章
10855瀏覽量
211601 -
通信
+關注
關注
18文章
6028瀏覽量
135951 -
處理器系統
+關注
關注
0文章
9瀏覽量
7789 -
線程
+關注
關注
0文章
504瀏覽量
19675
發布評論請先 登錄
相關推薦
評論