加鎖可以分為三個(gè)階段:
現(xiàn)有如下場景:
三個(gè)人去銀行的一個(gè)窗口辦理業(yè)務(wù),一個(gè)窗口同一時(shí)刻只能接待一位顧客。抽象成代碼就是:
public static void main(String[] args){
ReentrantLock lock = new ReentrantLock();
new Thread(() -> {
lock.lock();
try {
System.out.println("顧客A辦理業(yè)務(wù)");
TimeUnit.MINUTES.sleep(5);
} catch (Exception e){
e.printStackTrace();
} finally {
lock.unlock();
}
}, "A").start();
new Thread(() -> {
lock.lock();
try {
System.out.println("顧客B辦理業(yè)務(wù)");
} catch (Exception e){
e.printStackTrace();
} finally {
lock.unlock();
}
}, "B").start();
new Thread(() -> {
lock.lock();
try {
System.out.println("顧客C辦理業(yè)務(wù)");
} catch (Exception e){
e.printStackTrace();
} finally {
lock.unlock();
}
}, "C").start();
}
1. lock 方法:
調(diào)用lock.lock()
的時(shí)候,實(shí)際上調(diào)用的是 NonfairSync 的 lock 方法,如下:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
compareAndSetState(0, 1)
這里比較并交換,期望 AQS 中的 state 的值是0,如果是,就將其修改成1,返回 true ,否則返回 false 。
返回 true 的時(shí)候,執(zhí)行 setExclusiveOwnerThread(Thread.currentThread())
把占用資源的線程設(shè)置成當(dāng)前線程。當(dāng)線程A進(jìn)來的時(shí)候,因?yàn)?AQS 中的 state 還沒別的線程去修改,所以是0,就會(huì)成功修改成 1,就直接加鎖成功了。
返回 false 的時(shí)候,就會(huì)進(jìn)入 esle 塊中,執(zhí)行 acquire(1)
方法。假如線程A加鎖成功還沒釋放鎖的時(shí)候,線程B進(jìn)來了,那么就會(huì)返回 false 。 acquire(1)
方法又主要包括三個(gè)方法,源碼如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
接下來依次來看這三個(gè)方法。
2. tryAcquire(arg) 方法:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
再點(diǎn)進(jìn)去看 nonfairTryAcquire(acquires)
:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
線程B進(jìn)到這段代碼就有三種情況:
currentThread 就是線程B,c 就是 AQS 中的 state,即1,c 不等于0,所以跑到 else if 中,判斷當(dāng)前線程和持有鎖的線程是否相同。current 是 B,而當(dāng)前持有鎖的是 A,也不相等,所以直接 return false。
如果線程B進(jìn)來的時(shí)候 A 剛好走了,即 c 等于0,那么進(jìn)到 if 里面。if 里面做的事就是再將 state 改成1,同時(shí)設(shè)置當(dāng)前占有鎖的線程為 B,然后返回 true;
如果當(dāng)前線程等于當(dāng)前占有鎖的線程,即進(jìn)來的還是線程A,那么就修改 state 的值(當(dāng)前值加1),然后返回 true。
3. addWaiter(Node.EXCLUSIVE) 方法:
上面說了 tryAcquire(arg)
方法,當(dāng)該方法返回 false 的時(shí)候,就會(huì)執(zhí)行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法,那么先看它里面的 addWaiter(Node.EXCLUSIVE)
方法,源碼如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
無特殊說明的時(shí)候,以下情況基于:當(dāng)前持有鎖的是線程A,并且還沒釋放,進(jìn)來的是線程B。
這個(gè)方法首先將線程B封裝成 Node,傳進(jìn)來的 mode 是 AQS 中的一個(gè)屬性,還沒哪里賦過值,所以是 null,當(dāng)前的 tail 其實(shí)也是 null,因?yàn)?AQS 隊(duì)列中現(xiàn)在還沒別的等待線程。是 null 的話,就執(zhí)行入隊(duì)方法 enq(node)
,該方法如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
for (; ;)
其實(shí)就是相當(dāng)于 while(true)
,進(jìn)行自旋。當(dāng)前的 tail 是 null ,所以進(jìn)入 if 中,這里有個(gè) compareAndSetHead(new Node())
方法,這里是 new 了一個(gè)節(jié)點(diǎn),姑且叫它傀儡節(jié)點(diǎn),將它設(shè)置為頭結(jié)點(diǎn),如果 new 成功了,尾結(jié)點(diǎn)也指向它。效果如下圖:
第二次循環(huán)的時(shí)候,t 就是不再是 null 了,而是指向剛才那個(gè)傀儡節(jié)點(diǎn)了,所以進(jìn)入 else 中。else 中做的事就是,將傳進(jìn)來的節(jié)點(diǎn),即封裝了線程B的節(jié)點(diǎn) node,將其 prev 設(shè)置成剛才new 的那個(gè)傀儡節(jié)點(diǎn),再將 tail 指向 封裝了線程B的 node;再將傀儡節(jié)點(diǎn)的 next 指針指向封裝了線程B的 node,如下圖:
當(dāng)線程C進(jìn)到 addWaiter(Node mode)
方法,此時(shí) tail 不是 null 了,已經(jīng)指向了 線程B的節(jié)點(diǎn),所以進(jìn)入到 if 塊里面,執(zhí)行:
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
這里就是將線程C的 prev 設(shè)置為當(dāng)前的 tail,即線程B的 node,然后將線程C設(shè)置為 tail,再將線程B的 next 設(shè)置為線程C。最后效果圖如下:
4. acquireQueued(final Node node, int arg) 方法:
再來看看這個(gè)方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這個(gè)固定的,假如傳進(jìn)來的 node 是線程B,首先會(huì)進(jìn)入自旋,看看 predecessor
這方法:
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
首先讓 p 等于 prev,此時(shí)線程B 節(jié)點(diǎn)的 prev是誰呢,直接看我上面畫的圖可以知道,線程B的 prev 就是傀儡節(jié)點(diǎn),所以這里 return 的就是傀儡節(jié)點(diǎn)。
回到外層,傀儡節(jié)點(diǎn)等于 head,所以又會(huì)執(zhí)行 tryAcquire(arg)
,即又重復(fù)上述步驟再次嘗試獲取鎖。因?yàn)榇藭r(shí)線程A持有鎖,所以線程B嘗試獲取鎖會(huì)失敗,即 tryAcquire(arg)
會(huì)返回 false。
返回 false, 那就執(zhí)行下一個(gè) if。首先看 shouldParkAfterFailedAcquire(p, node)
方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
prev 是傀儡節(jié)點(diǎn),它的 waitStatus 是0,因?yàn)榭芄?jié)點(diǎn) new 出來以后還沒改過它的 waitStatus 的值,默認(rèn)是0。Node.SIGNAL
的值是 -1,不相等,0 也不大于 0,所以進(jìn)入 else,執(zhí)行 compareAndSetWaitStatus(pred, ws, Node.SIGNAL)
,這是比較并交換,將傀儡節(jié)點(diǎn)的 waitStatus 的值由 0 改為 -1,最后返回了 false。
shouldParkAfterFailedAcquire(p, node)
方法返回了 false,因?yàn)樽孕?,所以又回?nbsp;final Node p = node.predecessor();
這一行。此時(shí) p 節(jié)點(diǎn)還是傀儡節(jié)點(diǎn),再去嘗試獲取鎖,如果線程A還是釋放,又獲取失敗了,就會(huì)再次執(zhí)行 shouldParkAfterFailedAcquire(p, node)
方法。
再次執(zhí)行 shouldParkAfterFailedAcquire(p, node)
的時(shí)候,傀儡節(jié)點(diǎn)的 waitStatus 就已經(jīng)是 -1 了,所以直接 return true。
這里 return 了 true,就會(huì)執(zhí)行 parkAndCheckInterrupt()
方法,看看這個(gè)方法源碼:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
這里的 this 就是線程B,這里調(diào)用了 park 方法,就讓線程B 在等著了。線程C進(jìn)來也一樣,執(zhí)行到這一步,就會(huì)調(diào)用 park 方法,一直在等著。當(dāng)線程A釋放鎖了,就會(huì)調(diào)用 unpark 方法,線程B和線程C就有一個(gè)可以搶到鎖了。
5. unlock 方法:
當(dāng)線程A調(diào)用了 unlock 方法,實(shí)際上調(diào)用的是:
public void unlock() {
sync.release(1);
}
點(diǎn)進(jìn)去之后是這樣的:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
再點(diǎn)進(jìn)去看 tryRelease(arg
方法:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
線程A getState 是 1,傳進(jìn)來的 releases 也是 1,所以相減結(jié)果就是 0。因?yàn)榻Y(jié)果是 0,所以會(huì)將 free 改成 true,然后調(diào)用 setExclusiveOwnerThread(null);
將當(dāng)前持有鎖的線程設(shè)置為 null。然后設(shè)置 AQS 的 state 等于 c,即等于 0,最后該方法 return true。
回到上一層,此時(shí)的 head 是傀儡節(jié)點(diǎn),不為空,并且傀儡節(jié)點(diǎn)的 waitStatus 剛才改成了 -1,不等于 0,所以會(huì)調(diào)用 unparkSuccessor(h);
方法:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
這里傳進(jìn)來的節(jié)點(diǎn)是傀儡節(jié)點(diǎn),它的 waitStatus 是 -1,小于 0,所以就會(huì)執(zhí)行 compareAndSetWaitStatus(node, ws, 0)
進(jìn)行比較并交換,又將傀儡節(jié)點(diǎn)的 waitStatus 改成了 0。
繼續(xù)往下執(zhí)行,得到的 s 就是線程B所在節(jié)點(diǎn),不為空,并且線程B節(jié)點(diǎn)的 waitStatus 還沒改過,是 0,所以直接執(zhí)行 LockSupport.unpark(s.thread)
。
因?yàn)檎{(diào)用了 unpark,所以剛才阻塞的線程B在 acquireQueued(final Node node, int arg)
方法中的自旋就繼續(xù)進(jìn)行,就會(huì)調(diào)用 tryAcquire(arg)
方法,這次因?yàn)锳已經(jīng)釋放鎖了,所以該方法會(huì)返回 true,就會(huì)執(zhí)行 if 里面的代碼:
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
看看 setHead(node)
方法:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
這里的 node 就是線程B節(jié)點(diǎn),將頭結(jié)點(diǎn)指向線程B節(jié)點(diǎn),將線程B節(jié)點(diǎn)的線程設(shè)置為空,前驅(qū)設(shè)置為空。外層再把傀儡節(jié)點(diǎn)的 next 指針設(shè)置為空,所以最終效果就是:
最終是傀儡節(jié)點(diǎn)出隊(duì),以前線程B所在節(jié)點(diǎn)成為新的傀儡節(jié)點(diǎn)。因?yàn)橹暗目芄?jié)點(diǎn)已經(jīng)沒有任何引用指向它了,就會(huì)被 GC 回收。
掃描二維碼