今天為你帶來(lái)的是 ReentrantLock 公平鎖與非公平鎖的源碼分析,它是 Java 并發(fā)包下的一個(gè) java.util.concurrent.locks 實(shí)現(xiàn)類(lèi),實(shí)現(xiàn)了 Lock 接口和 Serializable 接口。
初識(shí)
ReentrantLock 類(lèi)有兩個(gè)構(gòu)造函數(shù),一個(gè)是默認(rèn)的不帶參數(shù)的構(gòu)造函數(shù),創(chuàng)建一個(gè)默認(rèn)的非公平鎖的實(shí)現(xiàn),一個(gè)是帶參數(shù)的構(gòu)造函數(shù),根據(jù)參數(shù) fair 創(chuàng)建一個(gè)公平還是非公平的鎖。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
這里簡(jiǎn)單的說(shuō)一下公平鎖和非公平鎖的定義:
- 公平鎖:線程在同步隊(duì)列中通過(guò)先進(jìn)先出(FIFO)的方式獲取鎖,每個(gè)線程最終都能獲取鎖。
- 非公平鎖:線程可以通過(guò)插隊(duì)的方式搶占鎖,搶不到鎖就進(jìn)入同步隊(duì)列排隊(duì)。
NonfairSync 類(lèi)和 FairSync 類(lèi)繼承了 Sync 類(lèi),它們?nèi)齻€(gè)都是 ReentrantLock 的內(nèi)部類(lèi)。
AbstractQueuedSynchronizer,簡(jiǎn)稱 AQS,擁有三個(gè)核心組件:
- state:volatile 修飾,線程是否可以獲取鎖。
- Node:內(nèi)部隊(duì)列,雙向鏈表形式,沒(méi)有搶到鎖的對(duì)象就進(jìn)入這個(gè)隊(duì)列。主要字段有:pre 前一個(gè)節(jié)點(diǎn),next 下一個(gè)節(jié)點(diǎn),thread 線程,waitStatus 線程的狀態(tài)。
- exclusiveOwnerThread:當(dāng)前搶到鎖的線程。
如下圖,簡(jiǎn)單的了解一下 AQS。
//繼承了 AQS
abstract static class Sync extends AbstractQueuedSynchronizer
//繼承了 Sync 類(lèi),定義一個(gè)非公平鎖的實(shí)現(xiàn)
static final class NonfairSync extends Sync
//繼承了 Sync 類(lèi),定義了一個(gè)公平鎖的實(shí)現(xiàn)
static final class FairSync extends Sync
公平鎖
在分析公平鎖之前,先介紹一下 Sync 類(lèi),它是 ReentrantLock 的唯一的屬性,在構(gòu)造函數(shù)中被初始化,決定了用公平鎖還是非公平鎖的方式獲取鎖。
private final Sync sync;
用以下構(gòu)造方法創(chuàng)建一個(gè)公平鎖。
Lock lock = new ReentrantLock(true);
沿著 lock.lock()
調(diào)用情況一路往下分析。
//FairSync.lock()
final void lock() {
// 將 1 作為參數(shù),調(diào)用 AQS 的 acquire 方法獲取鎖
acquire(1);
}
acquire() 方法主要是干了 3 件事情
- tryAcquire() 嘗試獲取鎖。
- 獲取鎖失敗后,調(diào)用 addWaiter() 方法將線程封裝成 Node,加入同步隊(duì)列。
- acquireQueued() 將隊(duì)列中的節(jié)點(diǎn)按自旋的方式嘗試獲取鎖。
//AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
//嘗試獲取鎖,true:直接返回,false:調(diào)用 addWaiter()
// addWaiter() 將當(dāng)前線程封裝成節(jié)點(diǎn)
// acquireQueued() 將同步隊(duì)列中的節(jié)點(diǎn)循環(huán)嘗試獲取鎖
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire() 嘗試獲取鎖,如果線程本身持有鎖,則將這個(gè)線程重入鎖。
//FairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
//當(dāng)前線程
final Thread current = Thread.currentThread();
//AQS 中的狀態(tài)值
int c = getState();
//無(wú)線程占用鎖
if (c == 0) {
//當(dāng)前線程 用 cas 的方式設(shè)置狀態(tài)為 1
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//當(dāng)前線程成功設(shè)置 state 為 1,將當(dāng)前線程放入 AbstractOwnableSynchronizer 的 exclusiveOwnerThread 變量中
setExclusiveOwnerThread(current);
return true;
}
}
//當(dāng)前線程本來(lái)就持有鎖,進(jìn)入重入邏輯,返回 true
else if (current == getExclusiveOwnerThread()) {
//將 state + 1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 設(shè)置 state 變量,當(dāng)前線程持有鎖,不需要用 CAS 設(shè)置 state
setState(nextc);
return true;
}
//當(dāng)前線程獲取鎖失敗
return false;
}
hasQueuedPredecessors() 這個(gè)方法比較有紳士風(fēng)度,在 tryAcquire() 方法中被第一個(gè)調(diào)用,它謙讓比自己排隊(duì)長(zhǎng)的線程。
//AbstractQueuedSynchronizer.hasQueuedPredecessors()
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// 如果首節(jié)點(diǎn)獲取到了鎖,第二個(gè)節(jié)點(diǎn)不是當(dāng)前線程,返回 true,否則返回 false
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
addWaiter() 方法就是將獲取鎖失敗的線程加入到同步隊(duì)列尾部。
//AbstractOwnableSynchronizer.addWaiter()
private Node addWaiter(Node mode) {
//將當(dāng)前線程封裝成一個(gè)節(jié)點(diǎn)
Node node = new Node(Thread.currentThread(), mode);
//將新節(jié)點(diǎn)加入到同步隊(duì)列中
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS 設(shè)置尾節(jié)點(diǎn)是新節(jié)點(diǎn)
if (compareAndSetTail(pred, node)) {
pred.next = node;
//返回新的節(jié)點(diǎn)
return node;
}
}
//沒(méi)有將尾節(jié)點(diǎn)設(shè)置為新節(jié)點(diǎn)
enq(node);
return node;
}
//AbstractQueuedSynchronizer.enq()
private Node enq(final Node node) {
//自旋
for (;;) {
//尾節(jié)點(diǎn)為 null,創(chuàng)建新的同步隊(duì)列
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
//尾節(jié)點(diǎn)不為 null,CAS方式將新節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)
node.prev = t;
if (compareAndSetTail(t, node)) {
//舊的尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)為新節(jié)點(diǎn)
t.next = node;
return t;
}
}
}
}
acquireQueued() 方法當(dāng)節(jié)點(diǎn)為首節(jié)點(diǎn)的時(shí)候,再次調(diào)用 tryAcquire() 獲取鎖,否則就阻塞線程,等待被喚醒。
//AbstractQueuedSynchronizer.acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
//失敗標(biāo)識(shí)
boolean failed = true;
try {
//中斷標(biāo)識(shí)
boolean interrupted = false;
//自旋
for (;;) {
//獲取前一個(gè)節(jié)點(diǎn)
final Node p = node.predecessor();
//如果前一個(gè)節(jié)點(diǎn)是首節(jié)點(diǎn),輪到當(dāng)前線程獲取鎖
//tryAcquire() 嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
//獲取鎖成功,將當(dāng)前節(jié)點(diǎn)設(shè)置為首節(jié)點(diǎn)
setHead(node);
//將前一個(gè)節(jié)點(diǎn)的 Next 設(shè)置為 null
p.next = null;
//獲取鎖成功
failed = false;
return interrupted;
}
//是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
//阻塞的方法
parkAndCheckInterrupt())
//中斷標(biāo)識(shí)設(shè)為 true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire() 線程是否需要被阻塞,更改線程的 waitStatus 為 SIGNAL。parkAndCheckInterrupt() 實(shí)現(xiàn)真正的阻塞線程。
//AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前一個(gè)節(jié)點(diǎn)的 waitStatus 狀態(tài),默認(rèn)狀態(tài)為 0,第一次進(jìn)入必然走 else
int ws = pred.waitStatus;
// 第二次,直接返回 true
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//將waitStatus 狀態(tài)設(shè)置為 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//AbstractQueuedSynchronizer.parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
//阻塞當(dāng)前線程
LockSupport.park(this);
return Thread.interrupted();
}
以上就是公平鎖獲取鎖的全部過(guò)程,總結(jié)一下公平鎖獲取鎖的過(guò)程:
- 當(dāng)前線程調(diào)用 tryAcquire() 獲取鎖,成功則返回。
- 調(diào)用 addWaiter(),將線程封裝成 Node 節(jié)點(diǎn)加入同步隊(duì)列。
- acquireQueued() 自旋嘗試獲取鎖,成功則返回。
- shouldParkAfterFailedAcquire() 將線程設(shè)置為等待喚醒狀態(tài),阻塞當(dāng)前線程。
- 如果線程被喚醒,嘗試獲取鎖,成功則返回,失敗則繼續(xù)阻塞。
非公平鎖
用默認(rèn)的構(gòu)造方式創(chuàng)建一個(gè)非公平鎖。lock() 方法上來(lái)就嘗試搶占鎖,失敗則調(diào)用 acquire() 方法。
//NonfairSync.lock()
final void lock() {
//CAS 設(shè)置 state 為 1
if (compareAndSetState(0, 1))
//將當(dāng)前線程放入 AbstractOwnableSynchronizer 的 exclusiveOwnerThread 變量中
setExclusiveOwnerThread(Thread.currentThread());
else
//設(shè)置失敗,參考上一節(jié)公平鎖的 acquire()
acquire(1);
}
nonfairTryAcquire() 就沒(méi)有紳士風(fēng)度了,沒(méi)有了公平鎖 hasQueuedPredecessors() 方法。
//NonfairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
//調(diào)用 ReentrantLock 的 nonfairTryAcquire()
return nonfairTryAcquire(acquires);
}
//ReentrantLock.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果 state 變量為 0,用 CAS 設(shè)置 state 的值
if (c == 0) {
if (compareAndSetState(0, acquires)) {
//將當(dāng)前線程放入 AbstractOwnableSynchronizer 的 exclusiveOwnerThread 變量中
setExclusiveOwnerThread(current);
return true;
}
}
//當(dāng)前線程本來(lái)就持有鎖,進(jìn)入重入邏輯,返回 true
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
//設(shè)置 state 變量
setState(nextc);
return true;
}
return false;
}
以上就是非公平鎖獲取鎖,總結(jié)一下非公平鎖獲取鎖的過(guò)程:
- lock() 第一次嘗試獲取鎖,成功則返回。
- nonfairTryAcquire() 再次嘗試獲取鎖。
- 失敗則調(diào)用 addWaiter() 封裝線程為 Node 節(jié)點(diǎn)加入同步隊(duì)列。
- acquireQueued() 自旋嘗試獲取鎖,成功則返回。
- shouldParkAfterFailedAcquire() 將線程設(shè)置為等待喚醒狀態(tài),阻塞當(dāng)前線程。
- 如果線程被喚醒,嘗試獲取鎖,成功則返回,失敗則繼續(xù)阻塞。
公平鎖和非公平鎖對(duì)比
在下圖源碼中可以看出,公平鎖多了一個(gè) !hasQueuedPredecessors()
用來(lái)判斷是否有其他線程比當(dāng)前線程在同步隊(duì)列中排隊(duì)時(shí)間更長(zhǎng)。除此之外,非公平鎖在初始時(shí)就有 2 次獲取鎖的機(jī)會(huì),然后再到同步隊(duì)列中排隊(duì)。
unlock() 釋放鎖
獲取鎖之后必須得釋放,同一個(gè)線程不管重入了幾次鎖,必須得釋放幾次鎖,不然 state 變量將不會(huì)變成 0,鎖被永久占用,其他線程將永遠(yuǎn)也獲取不到鎖。
//ReentrantLock.unlock()
public void unlock() {
sync.release(1);
}
//AbstractQueuedSynchronizer.release()
public final boolean release(int arg) {
//調(diào)用 Sync 的 tryRelease()
if (tryRelease(arg)) {
Node h = head;
//首節(jié)點(diǎn)不是 null,首節(jié)點(diǎn)的 waitStatus 是 SIGNAL
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//Sync.tryRelease()
protected final boolean tryRelease(int releases) {
//state 變量減去 1
int c = getState() - releases;
//當(dāng)前線程不是占有鎖的線程,異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//state 變量成了 0
if (c == 0) {
free = true;
//將當(dāng)前占有的線程設(shè)置為 null
setExclusiveOwnerThread(null);
}
//設(shè)置 state 變量
setState(c);
return free;
}
//AbstractQueuedSynchronizer.unparkSuccessor()
private void unparkSuccessor(Node node) {
//節(jié)點(diǎn)的 waitStatus
int ws = node.waitStatus;
//為 SIGNAL 的時(shí)候,CAS 的方式更新為初始值 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//獲取下一個(gè)節(jié)點(diǎn)
Node s = node.next;
//下一個(gè)節(jié)點(diǎn)為空,或者 waitStatus 狀態(tài)為 CANCELLED
if (s == null || s.waitStatus > 0) {
s = null;
//從最后一個(gè)節(jié)點(diǎn)開(kāi)始找出 waitStatus 不是 CANCELLED 的節(jié)點(diǎn)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//下一個(gè)節(jié)點(diǎn)不是 null,喚醒它
if (s != null)
LockSupport.unpark(s.thread);
}
釋放鎖的邏輯就是 state 必須被減去 1 直到為 0,才可以喚醒下一個(gè)線程。
總結(jié)
ReentrantLock 主要是防止資源的使用沖突,保證同一個(gè)時(shí)間只能有一個(gè)線程在使用資源。比如:文件操作,同步發(fā)送消息等等。
本文分析了 ReentrantLock 的公平鎖和非公平鎖以及釋放鎖的原理,可以得出非公平鎖的效率比公平鎖效率高,非公平鎖初始時(shí)會(huì) 2 次獲取鎖,如果成功可以減少線程切換帶來(lái)的損耗。在非公平模式下,線程可能一直搶占不到鎖。
-
接口
+關(guān)注
關(guān)注
33文章
8718瀏覽量
152032 -
源碼
+關(guān)注
關(guān)注
8文章
653瀏覽量
29518 -
函數(shù)
+關(guān)注
關(guān)注
3文章
4346瀏覽量
63024 -
線程
+關(guān)注
關(guān)注
0文章
507瀏覽量
19764
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
平衡創(chuàng)新與倫理:AI時(shí)代的隱私保護(hù)和算法公平
![](https://file1.elecfans.com/web2/M00/83/CF/wKgaomRl8DiAZvIfAAGzCWDsfaE540.png)
#硬聲創(chuàng)作季 【Redis分布式鎖】從Redisson源碼剖析非公平加鎖機(jī)制
Lock體系結(jié)構(gòu)和讀寫(xiě)鎖機(jī)制解析
互斥量源碼分析測(cè)試
如何去獲取Arm Spinlock的公平性呢
具有隱私性的公平文檔交換協(xié)議
基于分層時(shí)間有色Petri網(wǎng)的支付協(xié)議公平性分析
鎖存器的原理分析
基于鄰近點(diǎn)算法的比例公平優(yōu)化方法
![基于鄰近點(diǎn)算法的比例<b class='flag-5'>公平</b>優(yōu)化方法](https://file.elecfans.com/web2/M00/49/51/poYBAGKhwJ-AK7HcAAARJn-ixpQ494.jpg)
基于公平心跳超時(shí)容錯(cuò)機(jī)制
公平高效機(jī)會(huì)干擾對(duì)齊算法
![<b class='flag-5'>公平</b>高效機(jī)會(huì)干擾對(duì)齊算法](https://file.elecfans.com/web2/M00/49/8A/poYBAGKhwMWAdd_fAAAUJeiif2Y893.jpg)
人工智能將會(huì)是新式的“電力”,公平教育時(shí)代正在到來(lái)
人工智能的算法公平性實(shí)現(xiàn)
![人工智能的算法<b class='flag-5'>公平</b>性實(shí)現(xiàn)](https://file.elecfans.com/web1/M00/AC/8D/pIYBAF3Ci8KARkQ6AAAjRgJo3yk177.jpg)
評(píng)論