一、CountDownLatch:
1、什么是 CountDownLatch:
CountDownLatch,閉鎖,就是一個(gè)基于 AQS 共享模式的同步計(jì)數(shù)器,它內(nèi)部的方法都是圍繞 AQS 實(shí)現(xiàn)的。主要作用是使一個(gè)或一組線程在其他線程執(zhí)行完畢之前,一直處于等待狀態(tài),直到其他線程執(zhí)行完成后再繼續(xù)執(zhí)行。
CountDownLatch 利用 AQS 的 state 變量充當(dāng)計(jì)數(shù)器(由 volatile 修飾并使用 CAS 進(jìn)行更新的),計(jì)數(shù)器的初始值就是線程的數(shù)量,每當(dāng)一個(gè)線程執(zhí)行完成,計(jì)數(shù)器的值就會(huì)減一,當(dāng)計(jì)數(shù)器的值為 0 時(shí),表示所有的線程都已經(jīng)完成任務(wù)了,那么接下來(lái)就喚醒在 CountDownLatch 上等待的線程執(zhí)行后面的任務(wù)。
那么當(dāng)計(jì)數(shù)器的值為 0 時(shí),主線程是如何被喚醒的呢?這就要從 CountDownLatch 的工作流程來(lái)說(shuō)明了,CountDownLatch 的工作流程可以看成在一開(kāi)始只在 CLH 隊(duì)列中放入一個(gè)主線程,然后不停的喚醒,喚醒之后如果發(fā)現(xiàn) state 還是不為0,則繼續(xù)等待。而主線程什么時(shí)候會(huì)被喚醒呢?
當(dāng)每個(gè)子線程執(zhí)行完畢的時(shí)候,會(huì)調(diào)用 countDown() 并基于 CAS 將計(jì)數(shù)器 state 的值減一,減一成功釋放資源后,就會(huì)調(diào)用 unparkSuccessor() 喚醒主線程,當(dāng)所有的子線程都執(zhí)行完了,也就是 state 為 0 時(shí),這時(shí)候主線程被喚醒之后就可以繼續(xù)執(zhí)行了。
state 被減成了 0 之后,就無(wú)法繼續(xù)使用這個(gè) CountDownLatch 了,需要重新 new 一個(gè),因?yàn)?state 的數(shù)量只有在初始化 CountDownLatch 的時(shí)候才可以設(shè)置,這也是 CountDownLatch 不可重用的原因。
2、CountDownLatch 的源碼簡(jiǎn)單說(shuō)明:
從代碼層面上來(lái)看,CountDownLatch 基于內(nèi)部類 Sync 實(shí)現(xiàn),而 Sync 繼承自 AQS。CountDownLatch 最主要有兩個(gè)方法:await() 和 countDown()
await(): 調(diào)用該方法的線程會(huì)被掛起,直到 CountDownLatch 計(jì)數(shù)器的值為 0 才繼續(xù)執(zhí)行,底層使用的是 AQS 的 tryAcquireShared()
countDown(): 用于減少計(jì)數(shù)器的數(shù)量,如果計(jì)數(shù)減為 0 的話,就會(huì)喚醒主線程,底層使用的是 AQS 的 releaseShared()
countDown() 方法詳細(xì)流程:
二、CyclicBarrier:
1、什么是CyclicBarrier:
CyclicBarrier,循環(huán)柵欄,通過(guò) CyclicBarrier 可以實(shí)現(xiàn)一組線程之間的相互等待,當(dāng)所有線程都到達(dá)屏障點(diǎn)之后再執(zhí)行后續(xù)的操作。通過(guò) await() 方法可以實(shí)現(xiàn)等待,當(dāng)最后一個(gè)線程執(zhí)行完,會(huì)使得所有在相應(yīng) CyclicBarrier 實(shí)例上的等待的線程被喚醒,而最后一個(gè)線程自身不會(huì)被暫停。
CyclicBarrier 沒(méi)有像 CountDownLatch 和 ReentrantLock 使用 AQS 的 state 變量,它是直接借助 ReentrantLock 加上 Condition 等待喚醒的功能進(jìn)而實(shí)現(xiàn)的。在構(gòu)建 CyclicBarrier 的時(shí)候,傳入的值會(huì)賦值給 CyclicBarrier 內(nèi)部維護(hù)的變量 count,同時(shí)也會(huì)賦值給 parties 變量(這是可以復(fù)用的關(guān)鍵)。
線程調(diào)用 await() 表示線程已經(jīng)到達(dá)柵欄,每次調(diào)用 await() 時(shí),會(huì)將 count 減一,操作 count 值是直接使用 ReentrantLock 來(lái)保證線程安全性的,如果 count 不為 0,則添加到 condition 隊(duì)列中,如果 count 等于 0,則把節(jié)點(diǎn)從 condition 隊(duì)列中移除并添加到 AQS 隊(duì)列中進(jìn)行全部喚醒,并且將 parties 的值重新賦值給 count 從而實(shí)現(xiàn)復(fù)用。
2、CyclicBarrier 的源碼分析:
(1)成員變量:
//同步操作鎖 private?final?ReentrantLock?lock?=?new?ReentrantLock(); //線程攔截器 private?final?Condition?trip?=?lock.newCondition(); //每次攔截的線程數(shù) private?final?int?parties; //換代前執(zhí)行的任務(wù) private?final?Runnable?barrierCommand; //表示柵欄的當(dāng)前代 private?Generation?generation?=?new?Generation(); //計(jì)數(shù)器 private?int?count; ? //靜態(tài)內(nèi)部類Generation private?static?class?Generation?{ ??boolean?broken?=?false; }
CyclicBarrier 是通過(guò)獨(dú)占鎖實(shí)現(xiàn)的,底層包含了 “ReentrantLock 對(duì)象 lock” 和 “Condition 對(duì)象 trip”,通過(guò)條件隊(duì)列 trip 來(lái)對(duì)線程進(jìn)行阻塞的,并且其內(nèi)部維護(hù)了兩個(gè) int 型的變量 parties 和 count:
parties 表示每次攔截的線程數(shù),該值在構(gòu)造時(shí)進(jìn)行賦值,用于實(shí)現(xiàn) CyclicBarrier 的復(fù)用;
count 是內(nèi)部計(jì)數(shù)器,它的初始值和 parties 相同,以后隨著每次 await 方法的調(diào)用而減 1,直到減為 0 就將所有線程喚醒。
CyclicBarrier 有一個(gè)靜態(tài)內(nèi)部類 Generation,該類的對(duì)象代表柵欄的當(dāng)前代,利用它可以實(shí)現(xiàn)循環(huán)等待,當(dāng) count 減為 0 會(huì)將所有阻塞的線程喚醒,并設(shè)置成下一代。
barrierCommand 表示換代前執(zhí)行的任務(wù),在喚醒所有線程前可以通過(guò) barrierCommand 來(lái)執(zhí)行指定的任務(wù)
(2)await() 方法:
CyclicBarrier 類最主要的功能就是使先到達(dá)屏障點(diǎn)的線程阻塞并等待后面的線程,其中它提供了兩種等待的方法,分別是定時(shí)等待和非定時(shí)等待。
//非定時(shí)等待 public?int?await()?throws?InterruptedException,?BrokenBarrierException?{ ??try?{ ????return?dowait(false,?0L); ??}?catch?(TimeoutException?toe)?{ ????throw?new?Error(toe); ??} } ? //定時(shí)等待 public?int?await(long?timeout,?TimeUnit?unit)?throws?InterruptedException,?BrokenBarrierException,?TimeoutException?{ ??return?dowait(true,?unit.toNanos(timeout)); }
BrokenBarrierException 表示柵欄已經(jīng)被破壞,破壞的原因可能是其中一個(gè)線程 await() 時(shí)被中斷或者超時(shí)。
可以看到不管是定時(shí)等待還是非定時(shí)等待,它們都調(diào)用了 dowait() 方法,只不過(guò)是傳入的參數(shù)不同而已,下面我們就來(lái)看看 dowait() 方法都做了些什么。
//核心等待方法 private?int?dowait(boolean?timed,?long?nanos)?throws?InterruptedException,?BrokenBarrierException,?TimeoutException?{ ??//?顯示鎖 ??final?ReentrantLock?lock?=?this.lock; ??lock.lock(); ??try?{ ????final?Generation?g?=?generation; ????//檢查當(dāng)前柵欄是否被打翻 ????if?(g.broken)?{ ??????throw?new?BrokenBarrierException(); ????} ????//檢查當(dāng)前線程是否被中斷 ????if?(Thread.interrupted())?{ ??????//如果當(dāng)前線程被中斷會(huì)做以下三件事 ??????//1.打翻當(dāng)前柵欄 ??????//2.喚醒攔截的所有線程 ??????//3.拋出中斷異常 ??????breakBarrier(); ??????throw?new?InterruptedException(); ????} ????//每次都將計(jì)數(shù)器的值減1 ????int?index?=?--count; ????//計(jì)數(shù)器的值減為0則需喚醒所有線程并轉(zhuǎn)換到下一代 ????if?(index?==?0)?{ ??????boolean?ranAction?=?false; ??????try?{ ????????//喚醒所有線程前先執(zhí)行指定的任務(wù) ????????final?Runnable?command?=?barrierCommand; ????????if?(command?!=?null)?{ ??????????command.run(); ????????} ????????ranAction?=?true; ????????//喚醒所有線程并轉(zhuǎn)到下一代 ????????nextGeneration(); ????????return?0; ??????}?finally?{ ????????//確保在任務(wù)未成功執(zhí)行時(shí)能將所有線程喚醒 ????????if?(!ranAction)?{ ??????????breakBarrier(); ????????} ??????} ????} ? ????//如果計(jì)數(shù)器不為0則執(zhí)行此循環(huán) ????for?(;;)?{ ??????try?{ ????????//根據(jù)傳入的參數(shù)來(lái)決定是定時(shí)等待還是非定時(shí)等待 ????????if?(!timed)?{ ??????????trip.await(); ????????}else?if?(nanos?>?0L)?{ ??????????nanos?=?trip.awaitNanos(nanos); ????????} ??????}?catch?(InterruptedException?ie)?{ ????????//若當(dāng)前線程在等待期間被中斷則打翻柵欄喚醒其他線程 ????????if?(g?==?generation?&&?!?g.broken)?{ ??????????breakBarrier(); ??????????throw?ie; ????????}?else?{ ??????????//若在捕獲中斷異常前已經(jīng)完成在柵欄上的等待,?則直接調(diào)用中斷操作 ??????????Thread.currentThread().interrupt(); ????????} ??????} ??????//如果線程因?yàn)榇蚍瓥艡诓僮鞫粏拘褎t拋出異常 ??????if?(g.broken)?{ ????????throw?new?BrokenBarrierException(); ??????} ??????//如果線程因?yàn)閾Q代操作而被喚醒則返回計(jì)數(shù)器的值 ??????if?(g?!=?generation)?{ ????????return?index; ??????} ??????//如果線程因?yàn)闀r(shí)間到了而被喚醒則打翻柵欄并拋出異常 ??????if?(timed?&&?nanos?<=?0L)?{ ????????breakBarrier(); ????????throw?new?TimeoutException(); ??????} ????} ??}?finally?{ ????lock.unlock(); ??} }
上面執(zhí)行的代碼相對(duì)比較容易看懂,我們?cè)賮?lái)看一下執(zhí)行流程:
執(zhí)行 dowait() 方法時(shí),先獲得顯示鎖,判斷當(dāng)前線程狀態(tài)是否被中斷,如果是,則執(zhí)行 breakBarrier() 方法,喚醒之前阻塞的所有線程,并將計(jì)數(shù)器重置,否則,往下執(zhí)行;
計(jì)數(shù)器 count 減 1,如果 count == 0,表示最后一個(gè)線程達(dá)到柵欄,接著執(zhí)行之前指定的 Runnable 接口,同時(shí)執(zhí)行 nextGeneration() 方法進(jìn)入下一代;
否則,進(jìn)入自旋,判斷當(dāng)前線程是進(jìn)入定時(shí)等待還是非定時(shí)等待,如果在等待過(guò)程中被中斷,執(zhí)行 breakBarrier() 方法,喚醒之前阻塞的所有線程;
判斷是否是因?yàn)閳?zhí)行 breakBarrier() 方法而被喚醒,如果是,則拋出異常;
判斷是否是正常的換代操作而被喚醒,如果是,則返回計(jì)數(shù)器的值;
判斷是否是超時(shí)而被喚醒,如果是,則喚醒之前阻塞的所有線程,并拋出異常;
釋放鎖。
(3)breakBarrier() 方法:
private?void?breakBarrier()?{ ?generation.broken?=?true;//柵欄被打破 ?count?=?parties;//重置count ?trip.signalAll();//喚醒之前阻塞的線程 }
(4)nextGeneration() 方法:
private?void?nextGeneration()?{ ?//喚醒所以的線程 ?trip.signalAll(); ?//重置計(jì)數(shù)器 ?count?=?parties; ?//重新開(kāi)始 ?generation?=?new?Generation(); }
(5)reset()方法:
//?重置barrier到初始狀態(tài),所有還在等待中的線程最終會(huì)拋出BrokenBarrierException。 public?void?reset()?{ ?final?ReentrantLock?lock?=?this.lock; ????lock.lock(); ????try?{ ?????breakBarrier();???//?break?the?current?generation ????????nextGeneration();?//?start?a?new?generation ????}?finally?{ ?????lock.unlock(); ????} }
三、Semaphore:
1、什么是 Semaphore:
Semaphore 信號(hào)量,主要用于控制并發(fā)訪問(wèn)共享資源的線程數(shù)量,底層基于 AQS 共享模式,并依賴 AQS 的變量 state 作為許可證 permit,通過(guò)控制許可證的數(shù)量,來(lái)保證線程之間的配合。線程使用 acquire() 獲取訪問(wèn)許可,只有拿到 “許可證” 后才能繼續(xù)運(yùn)行,當(dāng) Semaphore 的 permit 不為 0 的時(shí)候,對(duì)請(qǐng)求資源的線程放行,同時(shí) permit 的值減1,當(dāng) permit 的值為 0 時(shí),那么請(qǐng)求資源的線程會(huì)被阻塞直到其他線程釋放訪問(wèn)許可,當(dāng)線程對(duì)共享資源操作完成后,使用 release() 歸還訪問(wèn)許可。
不同于 CyclicBarrier 和 ReentrantLock,Semaphore 不會(huì)使用到 AQS 的 Condition 條件隊(duì)列,都是在 CLH 同步隊(duì)列中操作,只是當(dāng)前線程會(huì)被 park。另外 Semaphore 是不可重入的。
2、Semaphore 的公平和非公平兩種模式:
Semaphore 通過(guò)自定義兩種不同的同步器(FairSync 和 NonfairSync)提供了公平和非公平兩種工作模式,兩種模式下分別提供了限時(shí)/不限時(shí)、響應(yīng)中斷/不響應(yīng)中斷的獲取資源的方法(限時(shí)獲取總是及時(shí)響應(yīng)中斷的),而所有的釋放資源的 release() 操作是統(tǒng)一的。
公平模式: 遵循 FIFO,調(diào)用 acquire() 方法獲取許可證的順序時(shí),先判斷同步隊(duì)列中是不是存在其他的等待線程,如果存在就將請(qǐng)求線程封裝成 Node 結(jié)點(diǎn)加入同步隊(duì)列,從而保證每個(gè)線程獲取同步狀態(tài)都是按照先到先得的順序執(zhí)行的,否則對(duì) state 值進(jìn)行減操作并返回剩下的信號(hào)量
非公平模式: 是搶占式的,通過(guò)競(jìng)爭(zhēng)的方式獲取,不管同步隊(duì)列中是否存在等待線程,有可能一個(gè)新的獲取線程恰好在一個(gè)許可證釋放時(shí)得到了這個(gè)許可證,而前面還有等待的線程。
框架流程圖如下:
3、嘗試獲取資源 acquire()方法的執(zhí)行流程圖:
編輯:黃飛
?
評(píng)論