Linux內(nèi)核中的等待隊列
Linux內(nèi)核的等待隊列是以雙循環(huán)鏈表為基礎(chǔ)數(shù)據(jù)結(jié)構(gòu),與進程調(diào)度機制緊密結(jié)合,能夠用于實現(xiàn)核心的異步事件通知機制。在Linux2.4.21中,等待隊列在源代碼樹include/linux/wait.h中,這是一個通過list_head連接的典型雙循環(huán)鏈表,如下圖所示。
在這個鏈表中,有兩種數(shù)據(jù)結(jié)構(gòu):等待隊列頭(wait_queue_head_t)和等待隊列項(wait_queue_t)。等待隊列頭和等待隊列項中都包含一個list_head類型的域作為"連接件"。由于我們只需要對隊列進行添加和刪除操作,并不會修改其中的對象(等待隊列項),因此,我們只需要提供一把保護整個基礎(chǔ)設(shè)施和所有對象的鎖,這把鎖保存在等待隊列頭中,為wq_lock_t類型。在實現(xiàn)中,可以支持讀寫鎖(rwlock)或自旋鎖(spinlock)兩種類型,通過一個宏定義來切換。如果使用讀寫鎖,將wq_lock_t定義為rwlock_t類型;如果是自旋鎖,將wq_lock_t定義為spinlock_t類型。無論哪種情況,分別相應(yīng)設(shè)置wq_read_lock、wq_read_unlock、wq_read_lock_irqsave、wq_read_unlock_irqrestore、wq_write_lock_irq、wq_write_unlock、wq_write_lock_irqsave和wq_write_unlock_irqrestore等宏。
等待隊列頭
struct __wait_queue_head {
wq_lock_t lock;
struct list_head task_list;
};
typedef struct __wait_queue_head wait_queue_head_t;
前面已經(jīng)說過,等待隊列的主體是進程,這反映在每個等待隊列項中,是一個任務(wù)結(jié)構(gòu)指針(struct task_struct * task)。flags為該進程的等待標志,當(dāng)前只支持互斥。
等待隊列項
struct __wait_queue {
unsigned int flags;
#define WQ_FLAG_EXCLUSIVE 0x01
struct task_struct * task;
struct list_head task_list;
};
typedef struct __wait_queue wait_queue_t;
聲明和初始化
#define DECLARE_WAITQUEUE(name, tsk) \
wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk)
#define __WAITQUEUE_INITIALIZER(name, tsk) { \
task: tsk, \
task_list: { NULL, NULL }, \
__WAITQUEUE_DEBUG_INIT(name)}
通過DECLARE_WAITQUEUE宏將等待隊列項初始化成對應(yīng)的任務(wù)結(jié)構(gòu),并且用于連接的相關(guān)指針均設(shè)置為空。其中加入了調(diào)試相關(guān)代碼。
#define DECLARE_WAIT_QUEUE_HEAD(name) \
wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name)
#define __WAIT_QUEUE_HEAD_INITIALIZER(name) { \
lock: WAITQUEUE_RW_LOCK_UNLOCKED, \
task_list: { &(name).task_list, &(name).task_list }, \
__WAITQUEUE_HEAD_DEBUG_INIT(name)}
通過DECLARE_WAIT_QUEUE_HEAD宏初始化一個等待隊列頭,使得其所在鏈表為空,并設(shè)置鏈表為"未上鎖"狀態(tài)。其中加入了調(diào)試相關(guān)代碼。
static inline void init_waitqueue_head(wait_queue_head_t *q)
該函數(shù)初始化一個已經(jīng)存在的等待隊列頭,它將整個隊列設(shè)置為"未上鎖"狀態(tài),并將鏈表指針prev和next指向它自身。
{
q->lock = WAITQUEUE_RW_LOCK_UNLOCKED;
INIT_LIST_HEAD(&q->task_list);
}
static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)
該函數(shù)初始化一個已經(jīng)存在的等待隊列項,它設(shè)置對應(yīng)的任務(wù)結(jié)構(gòu),同時將標志位清0。
{
q->flags = 0;
q->task = p;
}
static inline int waitqueue_active(wait_queue_head_t *q)
該函數(shù)檢查等待隊列是否為空。
{
return !list_empty(&q->task_list);
}
static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
將指定的等待隊列項new添加到等待隊列頭head所在的鏈表頭部,該函數(shù)假設(shè)已經(jīng)獲得鎖。
{
list_add(&new->task_list, &head->task_list);
}
static inline void __add_wait_queue_tail(wait_queue_head_t *head, wait_queue_t *new)
將指定的等待隊列項new添加到等待隊列頭head所在的鏈表尾部,該函數(shù)假設(shè)已經(jīng)獲得鎖。
{
list_add_tail(&new->task_list, &head->task_list);
}
static inline void __remove_wait_queue(wait_queue_head_t *head, wait_queue_t *old)
將函數(shù)從等待隊列頭head所在的鏈表中刪除指定等待隊列項old,該函數(shù)假設(shè)已經(jīng)獲得鎖,并且old在head所在鏈表中。
{
list_del(&old->task_list);
}
睡眠和喚醒操作
對等待隊列的操作包括睡眠和喚醒(相關(guān)函數(shù)保存在源代碼樹的/kernel/sched.c和include/linux/sched.h中)。思想是更改當(dāng)前進程(CURRENT)的任務(wù)狀態(tài),并要求重新調(diào)度,因為這時這個進程的狀態(tài)已經(jīng)改變,不再在調(diào)度表的就緒隊列中,因此無法再獲得執(zhí)行機會,進入"睡眠"狀態(tài),直至被"喚醒",即其任務(wù)狀態(tài)重新被修改回就緒態(tài)。
常用的睡眠操作有interruptible_sleep_on和sleep_on。兩個函數(shù)類似,只不過前者將進程的狀態(tài)從就緒態(tài)(TASK_RUNNING)設(shè)置為TASK_INTERRUPTIBLE,允許通過發(fā)送signal喚醒它(即可中斷的睡眠狀態(tài));而后者將進程的狀態(tài)設(shè)置為TASK_UNINTERRUPTIBLE,在這種狀態(tài)下,不接收任何singal。以interruptible_sleep_on為例,其展開后的代碼是:
void interruptible_sleep_on(wait_queue_head_t *q)
{
unsigned long flags;
wait_queue_t wait;
/* 構(gòu)造當(dāng)前進程對應(yīng)的等待隊列項 */
init_waitqueue_entry(&wait, current);
/* 將當(dāng)前進程的狀態(tài)從TASK_RUNNING改為TASK_INTERRUPTIBLE */
current->state = TASK_INTERRUPTIBLE;
/* 將等待隊列項添加到指定鏈表中 */
wq_write_lock_irqsave(&q->lock,flags);
__add_wait_queue(q, &wait);
wq_write_unlock(&q->lock);
/* 進程重新調(diào)度,放棄執(zhí)行權(quán) */
schedule();
/* 本進程被喚醒,重新獲得執(zhí)行權(quán),首要之事是將等待隊列項從鏈表中刪除 */
wq_write_lock_irq(&q->lock);
__remove_wait_queue(q, &wait);
wq_write_unlock_irqrestore(&q->lock,flags);
/* 至此,等待過程結(jié)束,本進程可以正常執(zhí)行下面的邏輯 */
}
對應(yīng)的喚醒操作包括wake_up_interruptible和wake_up。wake_up函數(shù)不僅可以喚醒狀態(tài)為TASK_UNINTERRUPTIBLE的進程,而且可以喚醒狀態(tài)為TASK_INTERRUPTIBLE的進程。wake_up_interruptible只負責(zé)喚醒狀態(tài)為TASK_INTERRUPTIBLE的進程。這兩個宏的定義如下:
#define wake_up(x) __wake_up((x),TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, 1)
#define wake_up_interruptible(x) __wake_up((x),TASK_INTERRUPTIBLE, 1)
__wake_up函數(shù)主要是獲取隊列操作的鎖,具體工作是調(diào)用__wake_up_common完成的。
void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr)
{
if (q) {
unsigned long flags;
wq_read_lock_irqsave(&q->lock, flags);
__wake_up_common(q, mode, nr, 0);
wq_read_unlock_irqrestore(&q->lock, flags);
}
}
/*The core wakeup function. Non-exclusive wakeups (nr_exclusive == 0)just wake everything up. If it's an exclusive wakeup (nr_exclusive ==small +ve number) then we wake all the non-exclusive tasks and oneexclusive task.
There are circumstances in which we can try to wakea task which has already started to run but is not in stateTASK_RUNNING. try_to_wake_up() returns zero in this (rare) case, andwe handle it by contonuing to scan the queue. */
static inline void __wake_up_common (wait_queue_head_t *q, unsigned int mode, int nr_exclusive, const int sync)
參數(shù)q表示要操作的等待隊列,mode表示要喚醒任務(wù)的狀態(tài),如TASK_UNINTERRUPTIBLE或TASK_INTERRUPTIBLE等。nr_exclusive是要喚醒的互斥進程數(shù)目,在這之前遇到的非互斥進程將被無條件喚醒。sync表示???
{
struct list_head *tmp;
struct task_struct *p;
CHECK_MAGIC_WQHEAD(q);
WQ_CHECK_LIST_HEAD(&q->task_list);
/* 遍歷等待隊列 */
list_for_each(tmp,&q->task_list) {
unsigned int state;
/* 獲得當(dāng)前等待隊列項 */
wait_queue_t *curr = list_entry(tmp, wait_queue_t, task_list);
CHECK_MAGIC(curr->__magic);
/* 獲得對應(yīng)的進程 */
p = curr->task;
state = p->state;
/* 如果我們需要處理這種狀態(tài)的進程 */
if (state & mode) {
WQ_NOTE_WAKER(curr);
if (try_to_wake_up(p, sync) && (curr->flags&WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
}
/* 喚醒一個進程,將它放到運行隊列中,如果它還不在運行隊列的話。"當(dāng)前"進程總是在運行隊列中的(except when theactual re-schedule is in progress),and as such you're allowed to do thesimpler "current->state = TASK_RUNNING" to mark yourself runnablewithout the overhead of this. */
static inline int try_to_wake_up(struct task_struct * p, int synchronous)
{
unsigned long flags;
int success = 0;
/* 由于我們需要操作運行隊列,必須獲得對應(yīng)的鎖 */
spin_lock_irqsave(&runqueue_lock, flags);
/* 將進程狀態(tài)設(shè)置為TASK_RUNNING */
p->state = TASK_RUNNING;
/* 如果進程已經(jīng)在運行隊列中,釋放鎖退出 */
if (task_on_runqueue(p))
goto out;
/* 否則將進程添加到運行隊列中 */
add_to_runqueue(p);
/* 如果設(shè)置了同步標志 */
if (!synchronous || !(p->cpus_allowed & (1UL << smp_processor_id())))
reschedule_idle(p);
/* 喚醒成功,釋放鎖退出 */
success = 1;
out:
spin_unlock_irqrestore(&runqueue_lock, flags);
return success;
}
等待隊列應(yīng)用模式
等待隊列的的應(yīng)用涉及兩個進程,假設(shè)為A和B。A是資源的消費者,B是資源的生產(chǎn)者。A在消費的時候必須確保資源已經(jīng)生產(chǎn)出來,為此定義一個資源等待隊列。這個隊列同時要被進程A和進程B使用,我們可以將它定義為一個全局變量。
DECLARE_WAIT_QUEUE_HEAD(rsc_queue); /* 全局變量 */
在進程A中,執(zhí)行邏輯如下:
while (resource is unavaiable) {
interruptible_sleep_on( &wq );
}
consume_resource();
在進程B中,執(zhí)行邏輯如下:
produce_resource();
wake_up_interruptible( &wq );