Thread類
Thread類封裝了POSIX標準中的多線程機制,提供了一種簡單易用的線程模型。Thread類是Iperf的實現(xiàn)中比較重要的類,使Iperf實現(xiàn)多線程并行操作的核心。
Thread類的定義在文件lib/Thread.hpp中,其實現(xiàn)位于lib/Thread.cpp中。
/* ------------------------------------------------------------------- */
class Thread {
public:
Thread( void );
virtual ~Thread();
// start or stop a thread executing
void Start( void );
void Stop( void );
// run is the main loop for this thread
// usually this is called by Start(), but may be called
// directly for single-threaded applications.
virtual void Run( void ) = 0;
// wait for this or all threads to complete
void Join( void );
static void Joinall( void );
void DeleteSelfAfterRun( void ) {
mDeleteSelf = true;
}
// set a thread to be daemon, so joinall won't wait on it
void SetDaemon( void );
// returns the number of user (i.e. not daemon) threads
static int NumUserThreads( void ) {
return sNum;
}
static nthread_t GetID( void );
static bool EqualID( nthread_t inLeft, nthread_t inRight );
static nthread_t ZeroID( void );
protected:
nthread_t mTID;
bool mDeleteSelf;
// count of threads; used in joinall
static int sNum;
static Condition sNum_cond;
private:
// low level function which calls Run() for the object
// this must be static in order to work with pthread_create
static void* Run_Wrapper( void* paramPtr );
}; // end class Thread
數(shù)據(jù)成員說明:
mTID紀錄本線程的線程ID;
mDeleteSelf通過方法DeleteSelfAfterRun設(shè)置,用來說明是否在線程結(jié)束后釋放屬于該現(xiàn)程的變量;
sNum是一個靜態(tài)變量,即為所有的Thread實例所共有的。該變量紀錄所生成的線程的總數(shù)。Thread對象的Joinall方法通過該變量判斷所有的Thread實例是否執(zhí)行結(jié)束;
sNum_cond是用來同步對sNum的操作的條件變量,也是一個靜態(tài)變量。
主要函數(shù)成員說明:
Start方法:
/* -------------------------------------------------------------------
* Start the object's thread execution. Increments thread
* count, spawns new thread, and stores thread ID.
* ------------------------------------------------------------------- */
void Thread::Start( void ) {
if ( EqualID( mTID, ZeroID() ) ) {
// increment thread count
sNum_cond.Lock();
sNum++;
sNum_cond.Unlock();
Thread* ptr = this;
// pthreads -- spawn new thread
int err = pthread_create( &mTID, NULL, Run_Wrapper, ptr );
FAIL( err != 0, "pthread_create" );
}
} // end Start
首 先通過Num++紀錄一個新的線程的產(chǎn)生,之后通過pthread_create系統(tǒng)調(diào)用產(chǎn)生一個新的線程。新線程執(zhí)行Run_Wrapper函數(shù),以至 向該Thread實例的ptr指針作為參數(shù)。原線程在判斷pthread_create是否成功后退出Start函數(shù)。
Stop方法:
/* -------------------------------------------------------------------
* Stop the thread immediately. Decrements thread count and
* resets the thread ID.
* ------------------------------------------------------------------- */
void Thread::Stop( void ) {
if ( ! EqualID( mTID, ZeroID() ) ) {
// decrement thread count
sNum_cond.Lock();
sNum--;
sNum_cond.Signal();
sNum_cond.Unlock();
nthread_t oldTID = mTID;
mTID = ZeroID();
// exit thread
// use exit() if called from within this thread
// use cancel() if called from a different thread
if ( EqualID( pthread_self(), oldTID ) ) {
pthread_exit( NULL );
} else {
// Cray J90 doesn't have pthread_cancel; Iperf works okay without
pthread_cancel( oldTID );
}
}
} // end Stop
首先通過sNum--紀錄一個線程執(zhí)行結(jié)束,并通過sNum_cond的Signal方法激活此時wait在 sNum_cond的線程(某個主線程會調(diào)用調(diào)用Joinall方法,等待全部線程的結(jié)束,在Joinall方法中通過sNum_cond.Wait() 等待在sNum_cond條件變量上)。若結(jié)束的線程是自身,則調(diào)用pthread_exit函數(shù)結(jié)束,否則調(diào)用pthread_cancel函數(shù)。注 意:傳統(tǒng)的exit函數(shù)會結(jié)束整個進程(即該進程的全部線程)的運行,而pthread_exit函數(shù)僅結(jié)束該線程的運行。
Run_Wrapper方法:
/* -------------------------------------------------------------------
* Low level function which starts a new thread, called by
* Start(). The argument should be a pointer to a Thread object.
* Calls the virtual Run() function for that object.
* Upon completing, decrements thread count and resets thread ID.
* If the object is deallocated immediately after calling Start(),
* such as an object created on the stack that has since gone
* out-of-scope, this will obviously fail.
* [static]
* ------------------------------------------------------------------- */
void*
Thread::Run_Wrapper( void* paramPtr ) {
assert( paramPtr != NULL );
Thread* objectPtr = (Thread*) paramPtr;
// run (pure virtual function)
objectPtr->Run();
#ifdef HAVE_POSIX_THREAD
// detach Thread. If someone already joined it will not do anything
// If noone has then it will free resources upon return from this
// function (Run_Wrapper)
pthread_detach(objectPtr->mTID);
#endif
// set TID to zero, then delete it
// the zero TID causes Stop() in the destructor not to do anything
objectPtr->mTID = ZeroID();
if ( objectPtr->mDeleteSelf ) {
DELETE_PTR( objectPtr );
}
// decrement thread count and send condition signal
// do this after the object is destroyed, otherwise NT complains
sNum_cond.Lock();
sNum--;
sNum_cond.Signal();
sNum_cond.Unlock();
return NULL;
} // end run_wrapper
該方法是一個外包函數(shù)(wrapper),其主要功能是調(diào)用本實例的Run方法。實際上, Run_Wrapper是一個靜態(tài)成員函數(shù),是為所有的Thread實例所共有的,因此無法使用this指針。調(diào)用Run_Wrapper的Thread 是通過參數(shù)paramPtr指明具體的Thread實例的。在Run返回之后,通過pthread_detach使該線程在運行結(jié)束以后可以釋放資源。 Joinall函數(shù)是通過監(jiān)視sNum的數(shù)值等待所有線程運行結(jié)束的,而并非通過pthread_join函數(shù)。在完成清理工作后, Run_Wrapper減少sNum的值,并通過sNum_cond.Signal函數(shù)通知在Joinall中等待的線程。
Run方法:
從Run方法的聲明中知道,該方法是一個純虛函數(shù),因此Thread是一個抽象基類,主要 作用是為其派生類提供統(tǒng)一的對外接口。在Thread的派生類中,像Iperf中的Server,Client,Speader,Audience, Listener等類,都會為Run提供特定的實現(xiàn),完成不同的功能,這是對面向?qū)ο笤O(shè)計多態(tài)特性的運用。Thread函數(shù)通過Run方法提供了一個通用 的線程接口。
討論: 為什么要通過Run_Wrapper函數(shù)間接的調(diào)用Run函數(shù)?
首先,Thread的各派生類的完成的功能不同,但它們都是Thread的實例,都有一些相同的工作要做,如初始化和清理等。在Run_Wrapper中實現(xiàn)這些作為Thread實例所應(yīng)有的相同功能,在Run函數(shù)中實現(xiàn)派生類各自不同的功能,是比較合理的設(shè)計。
更重要的是,由于要通過Pthread_create函數(shù)調(diào)用Run_Wrapper函數(shù),因此 Run_Wrapper函數(shù)必須是一個靜態(tài)成員,無法使用this指針區(qū)分運行Run_Wrapper函數(shù)的具體實例,也就無法利用多態(tài)的特性。而這個問 題可以通過把this指針作為Run_Wrapper函數(shù)的參數(shù),并在Run_Wrapper中顯示調(diào)用具有多態(tài)特性的Run函數(shù)來解決。
這種使用一個wrapper函數(shù)的技術(shù)為我們提供了一種將C++面向?qū)ο缶幊毯蛡鹘y(tǒng)的Unix系統(tǒng)調(diào)用相結(jié)合的思路。
Joinall方法和SetDaemon方法:
/* -------------------------------------------------------------------
* Wait for all thread object's execution to complete. Depends on the
* thread count being accurate and the threads sending a condition
* signal when they terminate.
* [static]
* ------------------------------------------------------------------- */
void Thread::Joinall( void ) {
sNum_cond.Lock();
while ( sNum > 0 ) {
sNum_cond.Wait();
}
sNum_cond.Unlock();
} // end Joinall
/* -------------------------------------------------------------------
* set a thread to be daemon, so joinall won't wait on it
* this simply decrements the thread count that joinall uses,
* which is not a thorough solution, but works for the moment
* ------------------------------------------------------------------- */
void Thread::SetDaemon( void ) {
sNum_cond.Lock();
sNum--;
sNum_cond.Signal();
sNum_cond.Unlock();
}
由這兩個方法的實現(xiàn)可見,Thread類是通過計數(shù)器sNum監(jiān)視運行的線程數(shù)的。線程開始前(Start方法 中的pthread_create)sNum加一,線程結(jié)束后(Stop方法和Run_Wrapper方法末尾)sNum減一。Joinall通過條件變 量類的實例sNum_cond的Wait方法等待sNum的值改變。而SetDaemon的目的是使調(diào)用線程不再受主線程Joinall的約束,只是簡單 的把sNum減一就可以了。