用戶類進程之間使用信號量(semaphore)進行同步,內(nèi)核線程之間也使用了信號量。信號量與自旋鎖類似,保護臨界區(qū)代碼。但信號量與自旋鎖有一定的區(qū)別,信號量在無法得到資源時,內(nèi)核線程處于睡眠阻塞狀態(tài),而自旋鎖處于忙等待狀態(tài)。因此,如果資源被占用時間很短時,使用自旋鎖較好,因為它可節(jié)約調(diào)度時間。如果資源被占用的時間較長,使用信號量較好,因為可讓CPU調(diào)度去做其它進程的工作。 操作信號量的API函數(shù)說明如表6。
樣例:信號量的使用 下面函數(shù)do_utimes利用信號量防止多個線程對文件系統(tǒng)節(jié)點inode同時進行訪問。其列出如下(在fs/open.c中): long do_utimes(char __user * filename, struct timeval * times) { struct inode * inode; …… down(&inode->i_sem); //獲取信號量 error = notify_change(nd.dentry, &newattrs);//修改inode中值 up(&inode->i_sem); //釋放信號量 …… } 下面說明信號量API函數(shù)。 (1)信號量結構semaphore信號量用結構semaphore描述,它在自旋鎖的基礎上改進而成,它包括一個自旋鎖、信號量計數(shù)器和一個等待隊列。用戶程序只能調(diào)用信號量API函數(shù),而不能直接訪問信號量結構,其列出如下(在include/linux/semaphore.h中): struct semaphore { spinlock_t lock; unsigned int count; struct list_head wait_list; }; (2)初始化函數(shù)sema_init函數(shù)sema_init初始化信號量,將信號量值初始化為n,其列出如下: static inline void sema_init(struct semaphore *sem, int val) { static struct lock_class_key __key; *sem = (struct semaphore) __SEMAPHORE_INITIALIZER(*sem, val); /*初始化一個鎖的實例,用于調(diào)試中獲取信號量的調(diào)試信息*/ lockdep_init_map(&sem->lock.dep_map, "semaphore->lock", &__key, 0); } #define __SEMAPHORE_INITIALIZER(name, n) { .lock = __SPIN_LOCK_UNLOCKED?lock), \ //初始化自旋鎖 .count = n, \ //將信號量計數(shù)器賦值為n .wait_list = LIST_HEAD_INIT((name).wait_list), \ //初始化等待隊列 } (3)可中斷獲取信號量函數(shù)down_interruptible函數(shù)down_interruptible獲取信號量,存放在參數(shù)sem中。它嘗試獲取信號量,如果其他線程被允許嘗試獲取此信號量,則將本線程睡眠等待。如果有一個信號中斷睡眠,則它返回錯誤-EINTR。如果成功獲取信號量,函數(shù)返回0。 函數(shù)down_interruptible列出如下(在kernel/semaphore.c中): int down_interruptible(struct semaphore *sem) { unsigned long flags; int result = 0; spin_lock_irqsave(&sem->lock, flags); //獲取自旋鎖,關閉中斷,將狀態(tài)寄存器值存放在flags /*如果信號量計數(shù)器值大于0,說明有多個空閑資源可訪問,可以成功獲取信號量了*/ if (likely(sem->count > 0)) //likely表示成功獲取的概率大,通知編譯器進行分支預測優(yōu)化 sem->count--; else result = __down_interruptible(sem); //進入睡眠等待 spin_unlock_irqrestore(&sem->lock, flags); return result; } static noinline int __sched __down_interruptible(struct semaphore *sem) { return __down_common(sem, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT); } 函數(shù)__down_common進入睡眠等待,其列出如下: static inline int __sched __down_common(struct semaphore *sem, long state, long timeout) { struct task_struct *task = current; struct semaphore_waiter waiter; list_add_tail(&waiter.list, &sem->wait_list); //加入到等待隊列 waiter.task = task; waiter.up = 0; for (;;) { if (state == TASK_INTERRUPTIBLE && signal_pending(task)) goto interrupted; if (state == TASK_KILLABLE && fatal_signal_pending(task)) goto interrupted; if (timeout <= 0) goto timed_out; __set_task_state(task, state); spin_unlock_irq(&sem->lock); timeout = schedule_timeout(timeout); //調(diào)度 spin_lock_irq(&sem->lock); if (waiter.up) return 0; } timed_out: list_del(&waiter.list); return -ETIME; interrupted: list_del(&waiter.list); return -EINTR; } (3)釋放信號量函數(shù)up函數(shù)up在沒有其他線程等待使用信號量的情況下釋放信號量,否則,喚醒其他等待線程。其列出如下: void up(struct semaphore *sem) { unsigned long flags; spin_lock_irqsave(&sem->lock, flags); /*判斷是否有線程等待在此信號量上,即判斷等待隊列是否為空*/ if (likely(list_empty(&sem->wait_list))) /*沒有線程等待此信號量,釋放信號量,將信號量計數(shù)器加1,表示增加了1個空閑資源*/ sem->count++; else __up(sem); /*將本線程從等待隊列刪除,喚醒等待此信號量的其他線程*/ spin_unlock_irqrestore(&sem->lock, flags); } static noinline void __sched __up(struct semaphore *sem) { struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list, struct semaphore_waiter, list); list_del(&waiter->list); //將本線程從等待隊列刪除 waiter->up = 1; wake_up_process(waiter->task); //喚醒等待此信號量的其他線程 } 互斥鎖信號量的初始值表示可以有多少個任務可同時訪問的共享資源,如果初始值為1,表示只有1個任務可以訪問,信號量變成互斥鎖(Mutex)??梢娀コ怄i是信號量的特例。 互斥鎖(mutex)是在原子操作API的基礎上實現(xiàn)的信號量行為。互斥鎖不能進行遞歸鎖定或解鎖,能用于交互上下文,同一時間只能有一個任務持有互斥鎖。 互斥鎖功能上基本上與信號量一樣,互斥鎖占用空間比信號量小,運行效率比信號量高?;コ怄i的API函數(shù)功能說明如表1。
互斥鎖用結構mutex描述,它含有信號量計數(shù)和等待隊列成員,信號量的值為1或0或負數(shù)。其列出如下(在include/linux/mutex.h中): struct mutex { /* 1:表示解鎖,0:表示鎖住,負數(shù):表示鎖住,可能有等待者*/ atomic_t count; spinlock_t wait_lock; /*操作等待隊列的自旋鎖*/ struct list_head wait_list; /*等待隊列*/ /*省略了用于調(diào)試的結構成員*/ }; 讀/寫信號量讀/寫信號量適于在讀多寫少的情況下使用。如果一個任務需要讀和寫操作時,它將被看作寫者,在不需要寫操作的情況下可降級為讀者。任意多個讀者可同時擁有一個讀/寫信號量,對臨界區(qū)代碼進行操作。 在沒有寫者操作時,任何讀者都可成功獲得讀/寫信號量進行讀操作。如果有寫者在操作時,讀者必須被掛起等待直到寫者釋放該信號量。在沒有寫者或讀者操作時,寫者必須等待前面的寫者或讀者釋放該信號量后,才能訪問臨界區(qū)。寫者獨占臨界區(qū),排斥其他的寫者和讀者,而讀者只排斥寫者。 讀/寫信號量可通過依賴硬件架構或純軟件代碼兩種方式實現(xiàn)。下面只說明純軟件代碼實現(xiàn)方式。 (1)API說明用戶可通過調(diào)用讀/寫信號量API實現(xiàn)讀/寫操作的同步。讀/寫信號量API說明如表1。
(2)讀/寫信號量結構rw_semaphore讀/寫信號量結構rw_semaphore描述了讀/寫信號量的值和等待隊列,其列出如下(在include/linux/rwsem-spinlock.h中): struct rw_semaphore { /*讀/寫信號量定義: * - 如果activity為0,那么沒有激活的讀者或?qū)懻摺? * - 如果activity為+ve,那么將有ve個激活的讀者。 * - 如果activity為-1,那么將有1個激活的寫者。 */ __s32 activity; /*信號量值*/ spinlock_t wait_lock; /*用于鎖等待隊列wait_list*/ struct list_head wait_list; /*如果非空,表示有進程等待該信號量*/ #ifdef CONFIG_DEBUG_LOCK_ALLOC /*用于鎖調(diào)試*/ struct lockdep_map dep_map; #endif }; (3)讀者加鎖/解鎖操作實現(xiàn)分析加讀者鎖操作讀者加鎖函數(shù)down_read用于加讀者鎖,如果沒有寫者操作時,等待隊列為空,讀者可以加讀者鎖,將信號量的讀者計數(shù)加1。如果有寫在操作時,等待隊列非空,讀者需要等待寫者操作完成。函數(shù)down_read列出如下(在kernel/rwsem.c中): void __sched down_read(struct rw_semaphore *sem) { might_sleep(); /*用于調(diào)試自旋鎖睡眠*/ rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_); /*確認獲得鎖,用于調(diào)試*/ /*跟蹤鎖狀態(tài)信息(如:鎖深度),用于調(diào)試*/ LOCK_CONTENDED(sem, __down_read_trylock, __down_read); } 函數(shù)__down_read 完成加讀者的具體操作,其列出如下(在lib/rwsem-spinlock.c中): void __sched __down_read(struct rw_semaphore *sem) { struct rwsem_waiter waiter; struct task_struct *tsk; spin_lock_irq(&sem->wait_lock); /*如果有0或多個讀者,并且等待隊列為空,就可以獲取sem*/ if (sem->activity >= 0 && list_empty(&sem->wait_list)) { /* 獲得sem */ sem->activity++; /*讀者計數(shù)加1*/ spin_unlock_irq(&sem->wait_lock); goto out; } /*運行到這里,說明不能獲取sem,將當前進程加入等待隊列進行等待*/ tsk = current; set_task_state(tsk, TASK_UNINTERRUPTIBLE); /* 建立等待隊列成員*/ waiter.task = tsk; waiter.flags = RWSEM_WAITING_FOR_READ; /*表示等待讀操作*/ get_task_struct(tsk); /*進程使用計數(shù)加1*/ list_add_tail(&waiter.list, &sem->wait_list); /*將等待成員加到等待隊列尾*/ /* 不再需要訪問等待隊列,因此,這里解鎖*/ spin_unlock_irq(&sem->wait_lock); /* 讀者等待獲取sem */ for (;;) { if (!waiter.task) break; schedule(); set_task_state(tsk, TASK_UNINTERRUPTIBLE); } /*運行這里,退出等待,說明可以獲取sem了*/ tsk->state = TASK_RUNNING; out: ; } 解讀者鎖操作函數(shù)up_read釋放讀者鎖,如果等待隊列非空,說明有寫者在等待,就從等待隊列喚醒一個寫者。其列出如下(在kernel/rwsem.c中): void up_read(struct rw_semaphore *sem) { rwsem_release(&sem->dep_map, 1, _RET_IP_); /*獲取解鎖信息,用于調(diào)試*/ __up_read(sem); } 函數(shù)__up_read是釋放讀者鎖的具體操作函數(shù),其列出如下: void __up_read(struct rw_semaphore *sem) { unsigned long flags; spin_lock_irqsave(&sem->wait_lock, flags); /*如果所有讀者完成讀操作,并且有寫者等待,那么喚醒一個寫者*/ if (--sem->activity == 0 && !list_empty(&sem->wait_list)) sem = __rwsem_wake_one_writer(sem); spin_unlock_irqrestore(&sem->wait_lock, flags); } /*喚醒一個寫者*/ static inline struct rw_semaphore * __rwsem_wake_one_writer(struct rw_semaphore *sem) { struct rwsem_waiter *waiter; struct task_struct *tsk; sem->activity = -1; /*表示有一個寫者正在寫操作*/ /*獲取一個等待者*/ waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list); list_del(&waiter->list); /*將該等待者從等待隊列刪除*/ tsk = waiter->task; smp_mb(); /*加內(nèi)存屏障,確保完成上面的指針引用操作*/ waiter->task = NULL; wake_up_process(tsk); /*喚醒進程*/ put_task_struct(tsk); /*進程上下文使用計數(shù)減1*/ return sem; } (3)寫者加鎖/解鎖操作實現(xiàn)分析加寫者鎖操作函數(shù)down_write完成加寫者鎖操作,其列出如下: void __sched down_write(struct rw_semaphore *sem) { might_sleep(); rwsem_acquire(&sem->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(sem, __down_write_trylock, __down_write); } void __sched __down_write(struct rw_semaphore *sem) { __down_write_nested(sem, 0); } 函數(shù)__down_write_nested完成加寫者鎖的具體操作。當沒有讀者或?qū)懻卟僮鲿r,寫者才可以獲取寫者鎖。寫者鎖是獨占的。如果有其他寫者或讀者操作時,寫者必須等待。其列出如下: void __sched __down_write_nested(struct rw_semaphore *sem, int subclass) { struct rwsem_waiter waiter; struct task_struct *tsk; spin_lock_irq(&sem->wait_lock); /*如果沒有讀者,并且等待隊列為空(說明沒有寫者)時,寫者才能獲取寫者鎖*/ if (sem->activity == 0 && list_empty(&sem->wait_list)) { /* 獲取寫者鎖*/ sem->activity = -1; spin_unlock_irq(&sem->wait_lock); goto out; } /*運行到這里,說明有讀者或?qū)懻咴诓僮?,需要等?/ tsk = current; set_task_state(tsk, TASK_UNINTERRUPTIBLE); /* 建立等待隊列成員*/ waiter.task = tsk; waiter.flags = RWSEM_WAITING_FOR_WRITE; /*標識為等待寫操作*/ get_task_struct(tsk); /*進程上下文使用計數(shù)加1*/ list_add_tail(&waiter.list, &sem->wait_list); /*加到等待隊列尾*/ spin_unlock_irq(&sem->wait_lock); /* 進行等待*/ for (;;) { if (!waiter.task) break; schedule(); set_task_state(tsk, TASK_UNINTERRUPTIBLE); } /*被喚醒*/ tsk->state = TASK_RUNNING; out: ; } 解寫者鎖操作函數(shù)up_write釋放寫者鎖,將讀者計數(shù)設置為0,其列出如下: void up_write(struct rw_semaphore *sem) { rwsem_release(&sem->dep_map, 1, _RET_IP_); __up_write(sem); } void __up_write(struct rw_semaphore *sem) { unsigned long flags; spin_lock_irqsave(&sem->wait_lock, flags); sem->activity = 0; /*表示有0個讀者*/ if (!list_empty(&sem->wait_list)) sem = __rwsem_do_wake(sem, 1); /*喚醒等待者*/ spin_unlock_irqrestore(&sem->wait_lock, flags); } |