詳解Java并發包基石AQS
AQS是一個用來構建鎖和同步器的框架,使用AQS能簡單且高效地構造出應用廣泛的大量的同步器,比如我們提到的ReentrantLock,Semaphore,其他的諸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。當然,我們自己也能利用AQS非常輕松容易地構造出符合我們自己需求的同步器。
本章我們就一起探究下這個神奇的東東,并對其實現原理進行剖析理解
二、基本實現原理AQS使用一個int成員變量來表示同步狀態,通過內置的FIFO隊列來完成獲取資源線程的排隊工作。
private volatile int state;//共享變量,使用volatile修飾保證線程可見性
狀態信息通過procted類型的getState,setState,compareAndSetState進行操作
AQS支持兩種同步方式:
1.獨占式
2.共享式
這樣方便使用者實現不同類型的同步組件,獨占式如ReentrantLock,共享式如Semaphore,CountDownLatch,組合式的如ReentrantReadWriteLock。總之,AQS為使用提供了底層支撐,如何組裝實現,使用者可以自由發揮。
同步器的設計是基于模板方法模式的,一般的使用方式是這樣:
1.使用者繼承AbstractQueuedSynchronizer并重寫指定的方法。(這些重寫方法很簡單,無非是對于共享資源state的獲取和釋放)
2.將AQS組合在自定義同步組件的實現中,并調用其模板方法,而這些模板方法會調用使用者重寫的方法。
這其實是模板方法模式的一個很經典的應用。
我們來看看AQS定義的這些可重寫的方法:
protected boolean tryAcquire(int arg) : 獨占式獲取同步狀態,試著獲取,成功返回true,反之為false protected boolean tryRelease(int arg) :獨占式釋放同步狀態,等待中的其他線程此時將有機會獲取到同步狀態; protected int tryAcquireShared(int arg) :共享式獲取同步狀態,返回值大于等于0,代表獲取成功;反之獲取失敗; protected boolean tryReleaseShared(int arg) :共享式釋放同步狀態,成功為true,失敗為false protected boolean isHeldExclusively() : 是否在獨占模式下被線程占用。關于AQS的使用,我們來簡單總結一下:
2.1、如何使用首先,我們需要去繼承AbstractQueuedSynchronizer這個類,然后我們根據我們的需求去重寫相應的方法,比如要實現一個獨占鎖,那就去重寫tryAcquire,tryRelease方法,要實現共享鎖,就去重寫tryAcquireShared,tryReleaseShared;最后,在我們的組件中調用AQS中的模板方法就可以了,而這些模板方法是會調用到我們之前重寫的那些方法的。也就是說,我們只需要很小的工作量就可以實現自己的同步組件,重寫的那些方法,僅僅是一些簡單的對于共享資源state的獲取和釋放操作,至于像是獲取資源失敗,線程需要阻塞之類的操作,自然是AQS幫我們完成了。
2.2、設計思想對于使用者來講,我們無需關心獲取資源失敗,線程排隊,線程阻塞/喚醒等一系列復雜的實現,這些都在AQS中為我們處理好了。我們只需要負責好自己的那個環節就好,也就是獲取/釋放共享資源state的姿勢T_T。很經典的模板方法設計模式的應用,AQS為我們定義好頂級邏輯的骨架,并提取出公用的線程入隊列/出隊列,阻塞/喚醒等一系列復雜邏輯的實現,將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現即可。
三、自定義同步器3.1、同步器代碼實現上面大概講了一些關于AQS如何使用的理論性的東西,接下來,我們就來看下實際如何使用,直接采用JDK官方文檔中的小例子來說明問題
package juc;import java.util.concurrent.locks.AbstractQueuedSynchronizer;public class Mutex implements java.io.Serializable { //靜態內部類,繼承AQS private static class Sync extends AbstractQueuedSynchronizer {//是否處于占用狀態protected boolean isHeldExclusively() { return getState() == 1;}//當狀態為0的時候獲取鎖,CAS操作成功,則state狀態為1,public boolean tryAcquire(int acquires) { if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true; } return false;}//釋放鎖,將同步狀態置為0protected boolean tryRelease(int releases) { if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true;} }//同步對象完成一系列復雜的操作,我們僅需指向它即可private final Sync sync = new Sync();//加鎖操作,代理到acquire(模板方法)上就行,acquire會調用我們重寫的tryAcquire方法public void lock() { sync.acquire(1);}public boolean tryLock() { return sync.tryAcquire(1);}//釋放鎖,代理到release(模板方法)上就行,release會調用我們重寫的tryRelease方法。public void unlock() { sync.release(1);}public boolean isLocked() { return sync.isHeldExclusively();}}3.2、同步器代碼測試
測試下這個自定義的同步器,我們使用之前文章中做過的并發環境下a++的例子來說明問題(a++的原子性其實最好使用原子類AtomicInteger來解決,此處用Mutex有點大炮打蚊子的意味,好在能說明問題就好)
package juc;import java.util.concurrent.CyclicBarrier;public class TestMutex { private static CyclicBarrier barrier = new CyclicBarrier(31); private static int a = 0; private static Mutex mutex = new Mutex(); public static void main(String []args) throws Exception {//說明:我們啟用30個線程,每個線程對i自加10000次,同步正常的話,最終結果應為300000;//未加鎖前for(int i=0;i<30;i++){ Thread t = new Thread(new Runnable() {@Overridepublic void run() { for(int i=0;i<10000;i++){increment1();//沒有同步措施的a++; } try {barrier.await();//等30個線程累加完畢 } catch (Exception e) {e.printStackTrace(); }} }); t.start();}barrier.await();System.out.println('加鎖前,a='+a);//加鎖后barrier.reset();//重置CyclicBarriera=0;for(int i=0;i<30;i++){ new Thread(new Runnable() {@Overridepublic void run() { for(int i=0;i<10000;i++){increment2();//a++采用Mutex進行同步處理 } try {barrier.await();//等30個線程累加完畢 } catch (Exception e) {e.printStackTrace(); }} }).start();}barrier.await();System.out.println('加鎖后,a='+a); } /** * 沒有同步措施的a++ * @return */ public static void increment1(){a++; } /** * 使用自定義的Mutex進行同步處理的a++ */ public static void increment2(){mutex.lock();a++;mutex.unlock(); }}
測試結果:
加鎖前,a=279204加鎖后,a=300000
四、源碼分析我們先來簡單描述下AQS的基本實現,前面我們提到過,AQS維護一個共享資源state,通過內置的FIFO來完成獲取資源線程的排隊工作。(這個內置的同步隊列稱為'CLH'隊列)。該隊列由一個一個的Node結點組成,每個Node結點維護一個prev引用和next引用,分別指向自己的前驅和后繼結點。AQS維護兩個指針,分別指向隊列頭部head和尾部tail。
其實就是個雙端雙向鏈表。
當線程獲取資源失敗(比如tryAcquire時試圖設置state狀態失敗),會被構造成一個結點加入CLH隊列中,同時當前線程會被阻塞在隊列中(通過LockSupport.park實現,其實是等待態)。當持有同步狀態的線程釋放同步狀態時,會喚醒后繼結點,然后此結點線程繼續加入到對同步狀態的爭奪中。
4.1、Node結點Node結點是AbstractQueuedSynchronizer中的一個靜態內部類,我們撿Node的幾個重要屬性來說一下
static final class Node {/** waitStatus值,表示線程已被取消(等待超時或者被中斷)*/static final int CANCELLED = 1;/** waitStatus值,表示后繼線程需要被喚醒(unpaking)*/static final int SIGNAL = -1;/**waitStatus值,表示結點線程等待在condition上,當被signal后,會從等待隊列轉移到同步到隊列中 *//** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2; /** waitStatus值,表示下一次共享式同步狀態會被無條件地傳播下去static final int PROPAGATE = -3;/** 等待狀態,初始為0 */volatile int waitStatus;/**當前結點的前驅結點 */volatile Node prev;/** 當前結點的后繼結點 */volatile Node next;/** 與當前結點關聯的排隊中的線程 */volatile Thread thread;/** ...... */ }4.2、獨占式
獲取同步狀態--acquire()
來看看acquire方法,lock方法一般會直接代理到acquire上
public final void acquire(int arg) { if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
我們來簡單理一下代碼邏輯:
a.首先,調用使用者重寫的tryAcquire方法,若返回true,意味著獲取同步狀態成功,后面的邏輯不再執行;若返回false,也就是獲取同步狀態失敗,進入b步驟;
b.此時,獲取同步狀態失敗,構造獨占式同步結點,通過addWatiter將此結點添加到同步隊列的尾部(此時可能會有多個線程結點試圖加入同步隊列尾部,需要以線程安全的方 式添加);
c.該結點以在隊列中嘗試獲取同步狀態,若獲取不到,則阻塞結點線程,直到被前驅結點喚醒或者被中斷。
addWaiter
為獲取同步狀態失敗的線程,構造成一個Node結點,添加到同步隊列尾部
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//構造結點 //指向尾結點tail Node pred = tail; //如果尾結點不為空,CAS快速嘗試在尾部添加,若CAS設置成功,返回;否則,eng。 if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) { pred.next = node; return node;} } enq(node); return node;}
先cas快速設置,若失敗,進入enq方法
將結點添加到同步隊列尾部這個操作,同時可能會有多個線程嘗試添加到尾部,是非線程安全的操作。
以上代碼可以看出,使用了compareAndSetTail這個cas操作保證安全添加尾結點。
enq方法
private Node enq(final Node node) { for (;;) {Node t = tail;if (t == null) { //如果隊列為空,創建結點,同時被head和tail引用 if (compareAndSetHead(new Node()))tail = head;} else { node.prev = t; if (compareAndSetTail(t, node)) {//cas設置尾結點,不成功就一直重試t.next = node;return t; }} }}
enq內部是個死循環,通過CAS設置尾結點,不成功就一直重試。很經典的CAS自旋的用法,我們在之前關于原子類的源碼分析中也提到過。這是一種樂觀的并發策略。
最后,看下acquireQueued方法
acquireQueued
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try {boolean interrupted = false;for (;;) {//死循環 final Node p = node.predecessor();//找到當前結點的前驅結點 if (p == head && tryAcquire(arg)) {//如果前驅結點是頭結點,才tryAcquire,其他結點是沒有機會tryAcquire的。setHead(node);//獲取同步狀態成功,將當前結點設置為頭結點。p.next = null; // 方便GCfailed = false;return interrupted; } // 如果沒有獲取到同步狀態,通過shouldParkAfterFailedAcquire判斷是否應該阻塞,parkAndCheckInterrupt用來阻塞線程 if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;} } finally {if (failed) cancelAcquire(node); }}
acquireQueued內部也是一個死循環,只有前驅結點是頭結點的結點,也就是老二結點,才有機會去tryAcquire;若tryAcquire成功,表示獲取同步狀態成功,將此結點設置為頭結點;若是非老二結點,或者tryAcquire失敗,則進入shouldParkAfterFailedAcquire去判斷判斷當前線程是否應該阻塞,若可以,調用parkAndCheckInterrupt阻塞當前線程,直到被中斷或者被前驅結點喚醒。若還不能休息,繼續循環。
shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //獲取前驅結點的wait值 int ws = pred.waitStatus; if (ws == Node.SIGNAL)//若前驅結點的狀態是SIGNAL,意味著當前結點可以被安全地parkreturn true; if (ws > 0) { // ws>0,只有CANCEL狀態ws才大于0。若前驅結點處于CANCEL狀態,也就是此結點線程已經無效,從后往前遍歷,找到一個非CANCEL狀態的結點,將自己設置為它的后繼結點do { node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node; } else { // 若前驅結點為其他狀態,將其設置為SIGNAL狀態compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}
若shouldParkAfterFailedAcquire返回true,也就是當前結點的前驅結點為SIGNAL狀態,則意味著當前結點可以放心休息,進入parking狀態了。parkAncCheckInterrupt阻塞線程并處理中斷。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//使用LockSupport使線程進入阻塞狀態 return Thread.interrupted();// 線程是否被中斷過}
至此,關于acquire的方法源碼已經分析完畢,我們來簡單總結下
a.首先tryAcquire獲取同步狀態,成功則直接返回;否則,進入下一環節;
b.線程獲取同步狀態失敗,就構造一個結點,加入同步隊列中,這個過程要保證線程安全;
c.加入隊列中的結點線程進入自旋狀態,若是老二結點(即前驅結點為頭結點),才有機會嘗試去獲取同步狀態;否則,當其前驅結點的狀態為SIGNAL,線程便可安心休息,進入阻塞狀態,直到被中斷或者被前驅結點喚醒。
釋放同步狀態--release()
當前線程執行完自己的邏輯之后,需要釋放同步狀態,來看看release方法的邏輯
public final boolean release(int arg) { if (tryRelease(arg)) {//調用使用者重寫的tryRelease方法,若成功,喚醒其后繼結點,失敗則返回falseNode h = head;if (h != null && h.waitStatus != 0) unparkSuccessor(h);//喚醒后繼結點return true; } return false;}
unparkSuccessor:喚醒后繼結點
private void unparkSuccessor(Node node) { //獲取wait狀態 int ws = node.waitStatus; if (ws < 0)compareAndSetWaitStatus(node, ws, 0);// 將等待狀態waitStatus設置為初始值0 Node s = node.next;//后繼結點 if (s == null || s.waitStatus > 0) {//若后繼結點為空,或狀態為CANCEL(已失效),則從后尾部往前遍歷找到一個處于正常阻塞狀態的結點 進行喚醒s = null;for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0)s = t; } if (s != null)LockSupport.unpark(s.thread);//使用LockSupprot喚醒結點對應的線程}
release的同步狀態相對簡單,需要找到頭結點的后繼結點進行喚醒,若后繼結點為空或處于CANCEL狀態,從后向前遍歷找尋一個正常的結點,喚醒其對應線程。
4.3、共享式共享式:共享式地獲取同步狀態。對于獨占式同步組件來講,同一時刻只有一個線程能獲取到同步狀態,其他線程都得去排隊等待,其待重寫的嘗試獲取同步狀態的方法tryAcquire返回值為boolean,這很容易理解;對于共享式同步組件來講,同一時刻可以有多個線程同時獲取到同步狀態,這也是“共享”的意義所在。其待重寫的嘗試獲取同步狀態的方法tryAcquireShared返回值為int。
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
1.當返回值大于0時,表示獲取同步狀態成功,同時還有剩余同步狀態可供其他線程獲取;
2.當返回值等于0時,表示獲取同步狀態成功,但沒有可用同步狀態了;
3.當返回值小于0時,表示獲取同步狀態失敗。
獲取同步狀態--acquireShared
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)//返回值小于0,獲取同步狀態失敗,排隊去;獲取同步狀態成功,直接返回去干自己的事兒。doAcquireShared(arg);}
doAcquireShared
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);//構造一個共享結點,添加到同步隊列尾部。若隊列初始為空,先添加一個無意義的傀儡結點,再將新節點添加到隊列尾部。 boolean failed = true;//是否獲取成功 try {boolean interrupted = false;//線程parking過程中是否被中斷過for (;;) {//死循環 final Node p = node.predecessor();//找到前驅結點 if (p == head) {//頭結點持有同步狀態,只有前驅是頭結點,才有機會嘗試獲取同步狀態int r = tryAcquireShared(arg);//嘗試獲取同步裝填if (r >= 0) {//r>=0,獲取成功 setHeadAndPropagate(node, r);//獲取成功就將當前結點設置為頭結點,若還有可用資源,傳播下去,也就是繼續喚醒后繼結點 p.next = null; // 方便GC if (interrupted)selfInterrupt(); failed = false; return;} } if (shouldParkAfterFailedAcquire(p, node) &&//是否能安心進入parking狀態parkAndCheckInterrupt())//阻塞線程interrupted = true;} } finally {if (failed) cancelAcquire(node); }}
大體邏輯與獨占式的acquireQueued差距不大,只不過由于是共享式,會有多個線程同時獲取到線程,也可能同時釋放線程,空出很多同步狀態,所以當排隊中的老二獲取到同步狀態,如果還有可用資源,會繼續傳播下去。
setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared()) doReleaseShared(); }}
釋放同步狀態--releaseShared
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {doReleaseShared();//釋放同步狀態return true; } return false;}
doReleaseShared
private void doReleaseShared() { for (;;) {//死循環,共享模式,持有同步狀態的線程可能有多個,采用循環CAS保證線程安全Node h = head;if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h);//喚醒后繼結點 } else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}if (h == head) break; }}
代碼邏輯比較容易理解,需要注意的是,共享模式,釋放同步狀態也是多線程的,此處采用了CAS自旋來保證。
五、總結關于AQS的介紹及源碼分析到此為止了。
AQS是JUC中很多同步組件的構建基礎,簡單來講,它內部實現主要是狀態變量state和一個FIFO隊列來完成,同步隊列的頭結點是當前獲取到同步狀態的結點,獲取同步狀態state失敗的線程,會被構造成一個結點(或共享式或獨占式)加入到同步隊列尾部(采用自旋CAS來保證此操作的線程安全),隨后線程會阻塞;釋放時喚醒頭結點的后繼結點,使其加入對同步狀態的爭奪中。
AQS為我們定義好了頂層的處理實現邏輯,我們在使用AQS構建符合我們需求的同步組件時,只需重寫tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared幾個方法,來決定同步狀態的釋放和獲取即可,至于背后復雜的線程排隊,線程阻塞/喚醒,如何保證線程安全,都由AQS為我們完成了,這也是非常典型的模板方法的應用。AQS定義好頂級邏輯的骨架,并提取出公用的線程入隊列/出隊列,阻塞/喚醒等一系列復雜邏輯的實現,將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現。
以上就是詳解Java并發包基石AQS的詳細內容,更多關于Java并發包基石AQS的資料請關注好吧啦網其它相關文章!
相關文章: