• <menu id="gyiem"><menu id="gyiem"></menu></menu>
  • <menu id="gyiem"><code id="gyiem"></code></menu>

    Java進階(四)線程間通信剖析

    原創文章,轉載請務必將下面這段話置于文章開頭處(保留超鏈接)。
    本文轉發自技術世界原文鏈接 http://www.luozeyang.com/java/thread_communication/

    CountDownLatch

    CountDownLatch適用場景

    Java多線程編程中經常會碰到這樣一種場景——某個線程需要等待一個或多個線程操作結束(或達到某種狀態)才開始執行。比如開發一個并發測試工具時,主線程需要等到所有測試線程均執行完成再開始統計總共耗費的時間,此時可以通過CountDownLatch輕松實現。

    CountDownLatch實例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    package com.test.thread;

    import java.util.Date;
    import java.util.concurrent.CountDownLatch;

    public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
    int totalThread = 3;
    long start = System.currentTimeMillis();
    CountDownLatch countDown = new CountDownLatch(totalThread);
    for(int i = 0; i < totalThread; i++) {
    final String threadName = "Thread " + i;
    new Thread(() -> {
    System.out.println(String.format("%s\t%s %s", new Date(), threadName, "started"));
    try {
    Thread.sleep(1000);
    } catch (Exception ex) {
    ex.printStackTrace();
    }
    System.out.println(String.format("%s\t%s %s", new Date(), threadName, "ended"));
    countDown.countDown();
    }).start();;
    }
    countDown.await();
    long stop = System.currentTimeMillis();
    System.out.println(String.format("Total time : %sms", (stop - start)));
    }
    }

    執行結果

    1
    2
    3
    4
    5
    6
    7
    Sun Jun 19 20:34:31 CST 2016  Thread 1 started
    Sun Jun 19 20:34:31 CST 2016 Thread 0 started
    Sun Jun 19 20:34:31 CST 2016 Thread 2 started
    Sun Jun 19 20:34:32 CST 2016 Thread 2 ended
    Sun Jun 19 20:34:32 CST 2016 Thread 1 ended
    Sun Jun 19 20:34:32 CST 2016 Thread 0 ended
    Total time : 1072ms

    可以看到,主線程等待所有3個線程都執行結束后才開始執行。

    CountDownLatch主要接口分析

    CountDownLatch工作原理相對簡單,可以簡單看成一個倒計數器,在構造方法中指定初始值,每次調用countDown()方法時將計數器減1,而await()會等待計數器變為0。CountDownLatch關鍵接口如下

    • countDown() 如果當前計數器的值大于1,則將其減1;若當前值為1,則將其置為0并喚醒所有通過await等待的線程;若當前值為0,則什么也不做直接返回。
    • await() 等待計數器的值為0,若計數器的值為0則該方法返回;若等待期間該線程被中斷,則拋出InterruptedException并清除該線程的中斷狀態。
    • await(long timeout, TimeUnit unit) 在指定的時間內等待計數器的值為0,若在指定時間內計數器的值變為0,則該方法返回true;若指定時間內計數器的值仍未變為0,則返回false;若指定時間內計數器的值變為0之前當前線程被中斷,則拋出InterruptedException并清除該線程的中斷狀態。
    • getCount() 讀取當前計數器的值,一般用于調試或者測試。

    CyclicBarrier

    CyclicBarrier適用場景

    在《當我們說線程安全時,到底在說什么》一文中講過內存屏障,它能保證屏障之前的代碼一定在屏障之后的代碼之前被執行。CyclicBarrier可以譯為循環屏障,也有類似的功能。CyclicBarrier可以在構造時指定需要在屏障前執行await的個數,所有對await的調用都會等待,直到調用await的次數達到預定指,所有等待都會立即被喚醒。

    從使用場景上來說,CyclicBarrier是讓多個線程互相等待某一事件的發生,然后同時被喚醒。而上文講的CountDownLatch是讓某一線程等待多個線程的狀態,然后該線程被喚醒。

    CyclicBarrier實例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    package com.test.thread;

    import java.util.Date;
    import java.util.concurrent.CyclicBarrier;

    public class CyclicBarrierDemo {

    public static void main(String[] args) {
    int totalThread = 5;
    CyclicBarrier barrier = new CyclicBarrier(totalThread);

    for(int i = 0; i < totalThread; i++) {
    String threadName = "Thread " + i;
    new Thread(() -> {
    System.out.println(String.format("%s\t%s %s", new Date(), threadName, " is waiting"));
    try {
    barrier.await();
    } catch (Exception ex) {
    ex.printStackTrace();
    }
    System.out.println(String.format("%s\t%s %s", new Date(), threadName, "ended"));
    }).start();
    }
    }
    }

    執行結果如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    Sun Jun 19 21:04:49 CST 2016  Thread 1  is waiting
    Sun Jun 19 21:04:49 CST 2016 Thread 0 is waiting
    Sun Jun 19 21:04:49 CST 2016 Thread 3 is waiting
    Sun Jun 19 21:04:49 CST 2016 Thread 2 is waiting
    Sun Jun 19 21:04:49 CST 2016 Thread 4 is waiting
    Sun Jun 19 21:04:49 CST 2016 Thread 4 ended
    Sun Jun 19 21:04:49 CST 2016 Thread 0 ended
    Sun Jun 19 21:04:49 CST 2016 Thread 2 ended
    Sun Jun 19 21:04:49 CST 2016 Thread 1 ended
    Sun Jun 19 21:04:49 CST 2016 Thread 3 ended

    從執行結果可以看到,每個線程都不會在其它所有線程執行await()方法前繼續執行,而等所有線程都執行await()方法后所有線程的等待都被喚醒從而繼續執行。

    CyclicBarrier主要接口分析

    CyclicBarrier提供的關鍵方法如下

    • await() 等待其它參與方的到來(調用await())。如果當前調用是最后一個調用,則喚醒所有其它的線程的等待并且如果在構造CyclicBarrier時指定了action,當前線程會去執行該action,然后該方法返回該線程調用await的次序(getParties()-1說明該線程是第一個調用await的,0說明該線程是最后一個執行await的),接著該線程繼續執行await后的代碼;如果該調用不是最后一個調用,則阻塞等待;如果等待過程中,當前線程被中斷,則拋出InterruptedException;如果等待過程中,其它等待的線程被中斷,或者其它線程等待超時,或者該barrier被reset,或者當前線程在執行barrier構造時注冊的action時因為拋出異常而失敗,則拋出BrokenBarrierException
    • await(long timeout, TimeUnit unit)await()唯一的不同點在于設置了等待超時時間,等待超時時會拋出TimeoutException
    • reset() 該方法會將該barrier重置為它的初始狀態,并使得所有對該barrier的await調用拋出BrokenBarrierException

    Phaser

    Phaser適用場景

    CountDownLatch和CyclicBarrier都是JDK 1.5引入的,而Phaser是JDK 1.7引入的。Phaser的功能與CountDownLatch和CyclicBarrier有部分重疊,同時也提供了更豐富的語義和更靈活的用法。

    Phaser顧名思義,與階段相關。Phaser比較適合這樣一種場景,一種任務可以分為多個階段,現希望多個線程去處理該批任務,對于每個階段,多個線程可以并發進行,但是希望保證只有前面一個階段的任務完成之后才能開始后面的任務。這種場景可以使用多個CyclicBarrier來實現,每個CyclicBarrier負責等待一個階段的任務全部完成。但是使用CyclicBarrier的缺點在于,需要明確知道總共有多少個階段,同時并行的任務數需要提前預定義好,且無法動態修改。而Phaser可同時解決這兩個問題。

    Phaser實例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class PhaserDemo {

    public static void main(String[] args) throws IOException {
    int parties = 3;
    int phases = 4;
    final Phaser phaser = new Phaser(parties) {
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
    System.out.println("====== Phase : " + phase + " ======");
    return registeredParties == 0;
    }
    };

    for(int i = 0; i < parties; i++) {
    int threadId = i;
    Thread thread = new Thread(() -> {
    for(int phase = 0; phase < phases; phase++) {
    System.out.println(String.format("Thread %s, phase %s", threadId, phase));
    phaser.arriveAndAwaitAdvance();
    }
    });
    thread.start();
    }
    }
    }

    執行結果如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    Thread 0, phase 0
    Thread 1, phase 0
    Thread 2, phase 0
    ====== Phase : 0 ======
    Thread 2, phase 1
    Thread 0, phase 1
    Thread 1, phase 1
    ====== Phase : 1 ======
    Thread 1, phase 2
    Thread 2, phase 2
    Thread 0, phase 2
    ====== Phase : 2 ======
    Thread 0, phase 3
    Thread 1, phase 3
    Thread 2, phase 3
    ====== Phase : 3 ======

    從上面的結果可以看到,多個線程必須等到其它線程的同一階段的任務全部完成才能進行到下一個階段,并且每當完成某一階段任務時,Phaser都會執行其onAdvance方法。

    Phaser主要接口分析

    Phaser主要接口如下

    • arriveAndAwaitAdvance() 當前線程當前階段執行完畢,等待其它線程完成當前階段。如果當前線程是該階段最后一個未到達的,則該方法直接返回下一個階段的序號(階段序號從0開始),同時其它線程的該方法也返回下一個階段的序號。
    • arriveAndDeregister() 該方法立即返回下一階段的序號,并且其它線程需要等待的個數減一,并且把當前線程從之后需要等待的成員中移除。如果該Phaser是另外一個Phaser的子Phaser(層次化Phaser會在后文中講到),并且該操作導致當前Phaser的成員數為0,則該操作也會將當前Phaser從其父Phaser中移除。
    • arrive() 該方法不作任何等待,直接返回下一階段的序號。
    • awaitAdvance(int phase) 該方法等待某一階段執行完畢。如果當前階段不等于指定的階段或者該Phaser已經被終止,則立即返回。該階段數一般由arrive()方法或者arriveAndDeregister()方法返回。返回下一階段的序號,或者返回參數指定的值(如果該參數為負數),或者直接返回當前階段序號(如果當前Phaser已經被終止)。
    • awaitAdvanceInterruptibly(int phase) 效果與awaitAdvance(int phase)相當,唯一的不同在于若該線程在該方法等待時被中斷,則該方法拋出InterruptedException
    • awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 效果與awaitAdvanceInterruptibly(int phase)相當,區別在于如果超時則拋出TimeoutException
    • bulkRegister(int parties) 注冊多個party。如果當前phaser已經被終止,則該方法無效,并返回負數。如果調用該方法時,onAdvance方法正在執行,則該方法等待其執行完畢。如果該Phaser有父Phaser則指定的party數大于0,且之前該Phaser的party數為0,那么該Phaser會被注冊到其父Phaser中。
    • forceTermination() 強制讓該Phaser進入終止狀態。已經注冊的party數不受影響。如果該Phaser有子Phaser,則其所有的子Phaser均進入終止狀態。如果該Phaser已經處于終止狀態,該方法調用不造成任何影響。

    Java進階系列

    郭俊 Jason wechat
    歡迎關注作者微信公眾號【大數據架構】
    您的贊賞將支持作者繼續原創分享
    速赢彩app 陕西西安 | 泰州 | 宣城 | 顺德 | 白城 | 深圳 | 哈密 | 六盘水 | 牡丹江 | 福建福州 | 锦州 | 资阳 | 清远 | 佳木斯 | 七台河 | 崇左 | 贵州贵阳 | 宁波 | 澄迈 | 开封 | 随州 | 温州 | 武夷山 | 衡水 | 南京 | 宣城 | 揭阳 | 芜湖 | 沧州 | 乳山 | 呼伦贝尔 | 鄢陵 | 定安 | 台州 | 香港香港 | 荆门 | 瓦房店 | 瓦房店 | 临汾 | 怒江 | 保定 | 永康 | 湖南长沙 | 灌云 | 钦州 | 湛江 | 大兴安岭 | 建湖 | 泰兴 | 通辽 | 周口 | 馆陶 | 商丘 | 大丰 | 揭阳 | 陕西西安 | 兴安盟 | 海南海口 | 象山 | 晋中 | 宝鸡 | 文昌 | 黄南 | 乐山 | 伊犁 | 潜江 | 乐山 | 辽宁沈阳 | 怒江 | 泰安 | 如皋 | 莱芜 | 新泰 | 大庆 | 玉林 | 淄博 | 吉林 | 阿拉尔 | 延安 | 长兴 | 辽源 | 灵宝 | 青海西宁 | 杞县 | 武威 | 玉林 | 张掖 | 仙桃 | 湘潭 | 菏泽 | 榆林 | 博罗 | 江门 | 郴州 | 凉山 | 海拉尔 | 文山 | 陕西西安 | 潮州 | 张北 | 扬州 | 廊坊 | 双鸭山 | 广州 | 南阳 | 临夏 | 如东 | 库尔勒 | 莱芜 | 庆阳 | 大丰 | 四川成都 | 襄阳 |