• <menu id="gyiem"><menu id="gyiem"></menu></menu>
  • <menu id="gyiem"><code id="gyiem"></code></menu>

    Java進階(六)從ConcurrentHashMap的演進看Java多線程核心技術

    原創文章,轉載請務必將下面這段話置于文章開頭處(保留超鏈接)。
    本文轉發自技術世界原文鏈接 http://www.luozeyang.com/java/concurrenthashmap/

    線程不安全的HashMap

    眾所周知,HashMap是非線程安全的。而HashMap的線程不安全主要體現在resize時的死循環及使用迭代器時的fast-fail上。

    注:本章的代碼均基于JDK 1.7.0_67

    HashMap工作原理

    HashMap數據結構

    常用的底層數據結構主要有數組和鏈表。數組存儲區間連續,占用內存較多,尋址容易,插入和刪除困難。鏈表存儲區間離散,占用內存較少,尋址困難,插入和刪除容易。

    HashMap要實現的是哈希表的效果,盡量實現O(1)級別的增刪改查。它的具體實現則是同時使用了數組和鏈表,可以認為最外層是一個數組,數組的每個元素是一個鏈表的表頭。

    HashMap尋址方式

    對于新插入的數據或者待讀取的數據,HashMap將Key的哈希值對數組長度取模,結果作為該Entry在數組中的index。在計算機中,取模的代價遠高于位操作的代價,因此HashMap要求數組的長度必須為2的N次方。此時將Key的哈希值對2^N-1進行與運算,其效果即與取模等效。HashMap并不要求用戶在指定HashMap容量時必須傳入一個2的N次方的整數,而是會通過Integer.highestOneBit算出比指定整數大的最小的2^N值,其實現方法如下。

    1
    2
    3
    4
    5
    6
    7
    8
    public static int highestOneBit(int i) {
    i |= (i >> 1);
    i |= (i >> 2);
    i |= (i >> 4);
    i |= (i >> 8);
    i |= (i >> 16);
    return i - (i >>> 1);
    }

    由于Key的哈希值的分布直接決定了所有數據在哈希表上的分布或者說決定了哈希沖突的可能性,因此為防止糟糕的Key的hashCode實現(例如低位都相同,只有高位不相同,與2^N-1取與后的結果都相同),JDK 1.7的HashMap通過如下方法使得最終的哈希值的二進制形式中的1盡量均勻分布從而盡可能減少哈希沖突。

    1
    2
    3
    4
    int h = hashSeed;
    h ^= k.hashCode();
    h ^= (h >>> 20) ^ (h >>> 12);
    return h ^ (h >>> 7) ^ (h >>> 4);

    resize死循環

    transfer方法

    當HashMap的size超過Capacity*loadFactor時,需要對HashMap進行擴容。具體方法是,創建一個新的,長度為原來Capacity兩倍的數組,保證新的Capacity仍為2的N次方,從而保證上述尋址方式仍適用。同時需要通過如下transfer方法將原來的所有數據全部重新插入(rehash)到新的數組中。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    void transfer(Entry[] newTable, boolean rehash) {
    int newCapacity = newTable.length;
    for (Entry<K,V> e : table) {
    while(null != e) {
    Entry<K,V> next = e.next;
    if (rehash) {
    e.hash = null == e.key ? 0 : hash(e.key);
    }
    int i = indexFor(e.hash, newCapacity);
    e.next = newTable[i];
    newTable[i] = e;
    e = next;
    }
    }
    }

    該方法并不保證線程安全,而且在多線程并發調用時,可能出現死循環。其執行過程如下。從步驟2可見,轉移時鏈表順序反轉。

    1. 遍歷原數組中的元素
    2. 對鏈表上的每一個節點遍歷:用next取得要轉移那個元素的下一個,將e轉移到新數組的頭部,使用頭插法插入節點
    3. 循環2,直到鏈表節點全部轉移
    4. 循環1,直到所有元素全部轉移

    單線程rehash

    單線程情況下,rehash無問題。下圖演示了單線程條件下的rehash過程
    HashMap rehash single thread

    多線程并發下的rehash

    這里假設有兩個線程同時執行了put操作并引發了rehash,執行了transfer方法,并假設線程一進入transfer方法并執行完next = e.next后,因為線程調度所分配時間片用完而“暫停”,此時線程二完成了transfer方法的執行。此時狀態如下。

    HashMap rehash multi thread step 1

    接著線程1被喚醒,繼續執行第一輪循環的剩余部分

    1
    2
    3
    e.next = newTable[1] = null
    newTable[1] = e = key(5)
    e = next = key(9)

    結果如下圖所示
    HashMap rehash multi thread step 2

    接著執行下一輪循環,結果狀態圖如下所示
    HashMap rehash multi thread step 3

    繼續下一輪循環,結果狀態圖如下所示
    HashMap rehash multi thread step 4

    此時循環鏈表形成,并且key(11)無法加入到線程1的新數組。在下一次訪問該鏈表時會出現死循環。

    Fast-fail

    產生原因

    在使用迭代器的過程中如果HashMap被修改,那么ConcurrentModificationException將被拋出,也即Fast-fail策略。

    當HashMap的iterator()方法被調用時,會構造并返回一個新的EntryIterator對象,并將EntryIterator的expectedModCount設置為HashMap的modCount(該變量記錄了HashMap被修改的次數)。

    1
    2
    3
    4
    5
    6
    7
    8
    HashIterator() {
    expectedModCount = modCount;
    if (size > 0) { // advance to first entry
    Entry[] t = table;
    while (index < t.length && (next = t[index++]) == null)
    ;
    }
    }

    在通過該Iterator的next方法訪問下一個Entry時,它會先檢查自己的expectedModCount與HashMap的modCount是否相等,如果不相等,說明HashMap被修改,直接拋出ConcurrentModificationException。該Iterator的remove方法也會做類似的檢查。該異常的拋出意在提醒用戶及早意識到線程安全問題。

    線程安全解決方案

    單線程條件下,為避免出現ConcurrentModificationException,需要保證只通過HashMap本身或者只通過Iterator去修改數據,不能在Iterator使用結束之前使用HashMap本身的方法修改數據。因為通過Iterator刪除數據時,HashMap的modCount和Iterator的expectedModCount都會自增,不影響二者的相等性。如果是增加數據,只能通過HashMap本身的方法完成,此時如果要繼續遍歷數據,需要重新調用iterator()方法從而重新構造出一個新的Iterator,使得新Iterator的expectedModCount與更新后的HashMap的modCount相等。

    多線程條件下,可使用Collections.synchronizedMap方法構造出一個同步Map,或者直接使用線程安全的ConcurrentHashMap。

    Java 7基于分段鎖的ConcurrentHashMap

    注:本章的代碼均基于JDK 1.7.0_67

    數據結構

    Java 7中的ConcurrentHashMap的底層數據結構仍然是數組和鏈表。與HashMap不同的是,ConcurrentHashMap最外層不是一個大的數組,而是一個Segment的數組。每個Segment包含一個與HashMap數據結構差不多的鏈表數組。整體數據結構如下圖所示。
    JAVA 7 ConcurrentHashMap

    尋址方式

    在讀寫某個Key時,先取該Key的哈希值。并將哈希值的高N位對Segment個數取模從而得到該Key應該屬于哪個Segment,接著如同操作HashMap一樣操作這個Segment。為了保證不同的值均勻分布到不同的Segment,需要通過如下方法計算哈希值。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    private int hash(Object k) {
    int h = hashSeed;
    if ((0 != h) && (k instanceof String)) {
    return sun.misc.Hashing.stringHash32((String) k);
    }
    h ^= k.hashCode();
    h += (h << 15) ^ 0xffffcd7d;
    h ^= (h >>> 10);
    h += (h << 3);
    h ^= (h >>> 6);
    h += (h << 2) + (h << 14);
    return h ^ (h >>> 16);
    }

    同樣為了提高取模運算效率,通過如下計算,ssize即為大于concurrencyLevel的最小的2的N次方,同時segmentMask為2^N-1。這一點跟上文中計算數組長度的方法一致。對于某一個Key的哈希值,只需要向右移segmentShift位以取高sshift位,再與segmentMask取與操作即可得到它在Segment數組上的索引。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
    ++sshift;
    ssize <<= 1;
    }
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];

    同步方式

    Segment繼承自ReentrantLock,所以我們可以很方便的對每一個Segment上鎖。

    對于讀操作,獲取Key所在的Segment時,需要保證可見性(請參考如何保證多線程條件下的可見性)。具體實現上可以使用volatile關鍵字,也可使用鎖。但使用鎖開銷太大,而使用volatile時每次寫操作都會讓所有CPU內緩存無效,也有一定開銷。ConcurrentHashMap使用如下方法保證可見性,取得最新的Segment。

    1
    Segment<K,V> s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)

    獲取Segment中的HashEntry時也使用了類似方法

    1
    2
    HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
    (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)

    對于寫操作,并不要求同時獲取所有Segment的鎖,因為那樣相當于鎖住了整個Map。它會先獲取該Key-Value對所在的Segment的鎖,獲取成功后就可以像操作一個普通的HashMap一樣操作該Segment,并保證該Segment的安全性。
    同時由于其它Segment的鎖并未被獲取,因此理論上可支持concurrencyLevel(等于Segment的個數)個線程安全的并發讀寫。

    獲取鎖時,并不直接使用lock來獲取,因為該方法獲取鎖失敗時會掛起(參考可重入鎖)。事實上,它使用了自旋鎖,如果tryLock獲取鎖失敗,說明鎖被其它線程占用,此時通過循環再次以tryLock的方式申請鎖。如果在循環過程中該Key所對應的鏈表頭被修改,則重置retry次數。如果retry次數超過一定值,則使用lock方法申請鎖。

    這里使用自旋鎖是因為自旋鎖的效率比較高,但是它消耗CPU資源比較多,因此在自旋次數超過閾值時切換為互斥鎖。

    size操作

    put、remove和get操作只需要關心一個Segment,而size操作需要遍歷所有的Segment才能算出整個Map的大小。一個簡單的方案是,先鎖住所有Sgment,計算完后再解鎖。但這樣做,在做size操作時,不僅無法對Map進行寫操作,同時也無法進行讀操作,不利于對Map的并行操作。

    為更好支持并發操作,ConcurrentHashMap會在不上鎖的前提逐個Segment計算3次size,如果某相鄰兩次計算獲取的所有Segment的更新次數(每個Segment都與HashMap一樣通過modCount跟蹤自己的修改次數,Segment每修改一次其modCount加一)相等,說明這兩次計算過程中無更新操作,則這兩次計算出的總size相等,可直接作為最終結果返回。如果這三次計算過程中Map有更新,則對所有Segment加鎖重新計算Size。該計算方法代碼如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    public int size() {
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum; // sum of modCounts
    long last = 0L; // previous sum
    int retries = -1; // first iteration isn't retry
    try {
    for (;;) {
    if (retries++ == RETRIES_BEFORE_LOCK) {
    for (int j = 0; j < segments.length; ++j)
    ensureSegment(j).lock(); // force creation
    }
    sum = 0L;
    size = 0;
    overflow = false;
    for (int j = 0; j < segments.length; ++j) {
    Segment<K,V> seg = segmentAt(segments, j);
    if (seg != null) {
    sum += seg.modCount;
    int c = seg.count;
    if (c < 0 || (size += c) < 0)
    overflow = true;
    }
    }
    if (sum == last)
    break;
    last = sum;
    }
    } finally {
    if (retries > RETRIES_BEFORE_LOCK) {
    for (int j = 0; j < segments.length; ++j)
    segmentAt(segments, j).unlock();
    }
    }
    return overflow ? Integer.MAX_VALUE : size;
    }

    不同之處

    ConcurrentHashMap與HashMap相比,有以下不同點

    • ConcurrentHashMap線程安全,而HashMap非線程安全
    • HashMap允許Key和Value為null,而ConcurrentHashMap不允許
    • HashMap不允許通過Iterator遍歷的同時通過HashMap修改,而ConcurrentHashMap允許該行為,并且該更新對后續的遍歷可見

    Java 8基于CAS的ConcurrentHashMap

    注:本章的代碼均基于JDK 1.8.0_111

    數據結構

    Java 7為實現并行訪問,引入了Segment這一結構,實現了分段鎖,理論上最大并發度與Segment個數相等。Java 8為進一步提高并發性,摒棄了分段鎖的方案,而是直接使用一個大的數組。同時為了提高哈希碰撞下的尋址性能,Java 8在鏈表長度超過一定閾值(8)時將鏈表(尋址時間復雜度為O(N))轉換為紅黑樹(尋址時間復雜度為O(long(N)))。其數據結構如下圖所示


    JAVA 8 ConcurrentHashMap

    尋址方式

    Java 8的ConcurrentHashMap同樣是通過Key的哈希值與數組長度取模確定該Key在數組中的索引。同樣為了避免不太好的Key的hashCode設計,它通過如下方法計算得到Key的最終哈希值。不同的是,Java 8的ConcurrentHashMap作者認為引入紅黑樹后,即使哈希沖突比較嚴重,尋址效率也足夠高,所以作者并未在哈希值的計算上做過多設計,只是將Key的hashCode值與其高16位作異或并保證最高位為0(從而保證最終結果為正整數)。

    1
    2
    3
    static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
    }

    同步方式

    對于put操作,如果Key對應的數組元素為null,則通過CAS操作將其設置為當前值。如果Key對應的數組元素(也即鏈表表頭或者樹的根元素)不為null,則對該元素使用synchronized關鍵字申請鎖,然后進行操作。如果該put操作使得當前鏈表長度超過一定閾值,則將該鏈表轉換為樹,從而提高尋址效率。

    對于讀操作,由于數組被volatile關鍵字修飾,因此不用擔心數組的可見性問題。同時每個元素是一個Node實例(Java 7中每個元素是一個HashEntry),它的Key值和hash值都由final修飾,不可變更,無須關心它們被修改后的可見性問題。而其Value及對下一個元素的引用由volatile修飾,可見性也有保障。

    1
    2
    3
    4
    5
    6
    static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    }

    對于Key對應的數組元素的可見性,由Unsafe的getObjectVolatile方法保證。

    1
    2
    3
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    size操作

    put方法和remove方法都會通過addCount方法維護Map的size。size方法通過sumCount獲取由addCount方法維護的Map的size。

    Java進階系列

    郭俊 Jason wechat
    歡迎關注作者微信公眾號【大數據架構】
    您的贊賞將支持作者繼續原創分享
    速赢彩app 淮北 | 鄂州 | 惠东 | 天水 | 鹤岗 | 涿州 | 乐平 | 宜都 | 鄂尔多斯 | 宁国 | 云南昆明 | 图木舒克 | 锦州 | 菏泽 | 偃师 | 金昌 | 西藏拉萨 | 包头 | 荣成 | 临海 | 湖南长沙 | 沧州 | 福建福州 | 乌海 | 清远 | 克拉玛依 | 无锡 | 阿勒泰 | 阳泉 | 永州 | 景德镇 | 东莞 | 亳州 | 日喀则 | 株洲 | 廊坊 | 莒县 | 岳阳 | 固原 | 义乌 | 宜宾 | 开封 | 三亚 | 忻州 | 慈溪 | 孝感 | 莱州 | 绵阳 | 芜湖 | 酒泉 | 洛阳 | 山西太原 | 四平 | 西藏拉萨 | 六盘水 | 海安 | 荆门 | 吕梁 | 芜湖 | 东方 | 任丘 | 三亚 | 定西 | 衢州 | 章丘 | 鄂州 | 陇南 | 宁德 | 马鞍山 | 启东 | 四平 | 陕西西安 | 开封 | 澄迈 | 楚雄 | 永州 | 莱芜 | 衡水 | 南阳 | 泉州 | 简阳 | 张北 | 简阳 | 潜江 | 云南昆明 | 鹤壁 | 上饶 | 黄山 | 张掖 | 凉山 | 运城 | 兴安盟 | 澳门澳门 | 安吉 | 保亭 | 泰兴 | 宁波 | 阿克苏 | 天门 | 甘孜 | 固原 | 七台河 | 湖州 | 德阳 | 开封 | 惠州 | 湖州 | 清徐 | 通辽 | 普洱 | 三沙 | 海安 | 滨州 |