• <menu id="gyiem"><menu id="gyiem"></menu></menu>
  • <menu id="gyiem"><code id="gyiem"></code></menu>

    Spark性能優化之道——解決Spark數據傾斜(Data Skew)的N種姿勢

    原創文章,轉載請務必將下面這段話置于文章開頭處。
    本文轉發自技術世界原文鏈接 http://www.luozeyang.com/spark/skew/

    摘要

    本文結合實例詳細闡明了Spark數據傾斜的幾種場景以及對應的解決方案,包括避免數據源傾斜,調整并行度,使用自定義Partitioner,使用Map側Join代替Reduce側Join,給傾斜Key加上隨機前綴等。

    為何要處理數據傾斜(Data Skew)

    什么是數據傾斜

    對Spark/Hadoop這樣的大數據系統來講,數據量大并不可怕,可怕的是數據傾斜。

    何謂數據傾斜?數據傾斜指的是,并行處理的數據集中,某一部分(如Spark或Kafka的一個Partition)的數據顯著多于其它部分,從而使得該部分的處理速度成為整個數據集處理的瓶頸。

    對于分布式系統而言,理想情況下,隨著系統規模(節點數量)的增加,應用整體耗時線性下降。如果一臺機器處理一批大量數據需要120分鐘,當機器數量增加到三時,理想的耗時為120 / 3 = 40分鐘,如下圖所示
    ideal scale out
      
    但是,上述情況只是理想情況,實際上將單機任務轉換成分布式任務后,會有overhead,使得總的任務量較之單機時有所增加,所以每臺機器的執行時間加起來比單臺機器時更大。這里暫不考慮這些overhead,假設單機任務轉換成分布式任務后,總任務量不變。
      
    但即使如此,想做到分布式情況下每臺機器執行時間是單機時的1 / N,就必須保證每臺機器的任務量相等。不幸的是,很多時候,任務的分配是不均勻的,甚至不均勻到大部分任務被分配到個別機器上,其它大部分機器所分配的任務量只占總得的小部分。比如一臺機器負責處理80%的任務,另外兩臺機器各處理10%的任務,如下圖所示
    unideal scale out
      
    在上圖中,機器數據增加為三倍,但執行時間只降為原來的80%,遠低于理想值。   

    數據傾斜的危害

    從上圖可見,當出現數據傾斜時,小量任務耗時遠高于其它任務,從而使得整體耗時過大,未能充分發揮分布式系統的并行計算優勢。
      
    另外,當發生數據傾斜時,部分任務處理的數據量過大,可能造成內存不足使得任務失敗,并進而引進整個應用失敗。   

    數據傾斜是如何造成的

    在Spark中,同一個Stage的不同Partition可以并行處理,而具有依賴關系的不同Stage之間是串行處理的。假設某個Spark Job分為Stage 0和Stage 1兩個Stage,且Stage 1依賴于Stage 0,那Stage 0完全處理結束之前不會處理Stage 1。而Stage 0可能包含N個Task,這N個Task可以并行進行。如果其中N-1個Task都在10秒內完成,而另外一個Task卻耗時1分鐘,那該Stage的總時間至少為1分鐘。換句話說,一個Stage所耗費的時間,主要由最慢的那個Task決定。

    由于同一個Stage內的所有Task執行相同的計算,在排除不同計算節點計算能力差異的前提下,不同Task之間耗時的差異主要由該Task所處理的數據量決定。

    Stage的數據來源主要分為如下兩類

    • 從數據源直接讀取。如讀取HDFS,Kafka
    • 讀取上一個Stage的Shuffle數據

    如何緩解/消除數據傾斜

    避免數據源的數據傾斜 ———— 讀Kafka

    以Spark Stream通過DirectStream方式讀取Kafka數據為例。由于Kafka的每一個Partition對應Spark的一個Task(Partition),所以Kafka內相關Topic的各Partition之間數據是否平衡,直接決定Spark處理該數據時是否會產生數據傾斜。

    如《Kafka設計解析(一)- Kafka背景及架構介紹》一文所述,Kafka某一Topic內消息在不同Partition之間的分布,主要由Producer端所使用的Partition實現類決定。如果使用隨機Partitioner,則每條消息會隨機發送到一個Partition中,從而從概率上來講,各Partition間的數據會達到平衡。此時源Stage(直接讀取Kafka數據的Stage)不會產生數據傾斜。

    但很多時候,業務場景可能會要求將具備同一特征的數據順序消費,此時就需要將具有相同特征的數據放于同一個Partition中。一個典型的場景是,需要將同一個用戶相關的PV信息置于同一個Partition中。此時,如果產生了數據傾斜,則需要通過其它方式處理。

    避免數據源的數據傾斜 ———— 讀文件

    原理

    Spark以通過textFile(path, minPartitions)方法讀取文件時,使用TextFileFormat。

    對于不可切分的文件,每個文件對應一個Split從而對應一個Partition。此時各文件大小是否一致,很大程度上決定了是否存在數據源側的數據傾斜。另外,對于不可切分的壓縮文件,即使壓縮后的文件大小一致,它所包含的實際數據量也可能差別很多,因為源文件數據重復度越高,壓縮比越高。反過來,即使壓縮文件大小接近,但由于壓縮比可能差距很大,所需處理的數據量差距也可能很大。

    此時可通過在數據生成端將不可切分文件存儲為可切分文件,或者保證各文件包含數據量相同的方式避免數據傾斜。

    對于可切分的文件,每個Split大小由如下算法決定。其中goalSize等于所有文件總大小除以minPartitions。而blockSize,如果是HDFS文件,由文件本身的block大小決定;如果是Linux本地文件,且使用本地模式,由fs.local.block.size決定。

    1
    2
    3
    protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
    }

    默認情況下各Split的大小不會太大,一般相當于一個Block大小(在Hadoop 2中,默認值為128MB),所以數據傾斜問題不明顯。如果出現了嚴重的數據傾斜,可通過上述參數調整。

    案例

    現通過腳本生成一些文本文件,并通過如下代碼進行簡單的單詞計數。為避免Shuffle,只計單詞總個數,不須對單詞進行分組計數。

    1
    2
    3
    4
    5
    6
    7
    SparkConf sparkConf = new SparkConf()
    .setAppName("ReadFileSkewDemo");
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    long count = javaSparkContext.textFile(inputFile, minPartitions)
    .flatMap((String line) -> Arrays.asList(line.split(" ")).iterator()).count();
    System.out.printf("total words : %s", count);
    javaSparkContext.stop();

    總共生成如下11個csv文件,其中10個大小均為271.9MB,另外一個大小為8.5GB。
    uncompressed files

    之后將8.5GB大小的文件使用gzip壓縮,壓縮后大小僅為25.3MB。
    compressed files

    使用如上代碼對未壓縮文件夾進行單詞計數操作。Split大小為 max(minSize, min(goalSize, blockSize) = max(1 B, min((271.9 10+8.5 1024) / 1 MB, 128 MB) = 128MB。無明顯數據傾斜。
    splitable_unskewed

    使用同樣代碼對包含壓縮文件的文件夾進行同樣的單詞計數操作。未壓縮文件的Split大小仍然為128MB,而壓縮文件(gzip壓縮)由于不可切分,且大小僅為25.3MB,因此該文件作為一個單獨的Split/Partition。雖然該文件相對較小,但是它由8.5GB文件壓縮而來,包含數據量是其它未壓縮文件的32倍,因此處理該Split/Partition/文件的Task耗時為4.4分鐘,遠高于其它Task的10秒。
    compressed file skew

    由于上述gzip壓縮文件大小為25.3MB,小于128MB的Split大小,不能證明gzip壓縮文件不可切分。現將minPartitions從默認的1設置為229,從而目標Split大小為max(minSize, min(goalSize, blockSize) = max(1 B, min((271.9 * 10+25.3) / 229 MB, 128 MB) = 12 MB。如果gzip壓縮文件可切分,則所有Split/Partition大小都不會遠大于12。反之,如果仍然存在25.3MB的Partition,則說明gzip壓縮文件確實不可切分,在生成不可切分文件時需要如上文所述保證各文件數量大大致相同。

    如下圖所示,gzip壓縮文件對應的Split/Partition大小為25.3MB,其它Split大小均為12MB左右。而該Task耗時4.7分鐘,遠大于其它Task的4秒。
    compressed unsplitable file skew

    總結

    適用場景
    數據源側存在不可切分文件,且文件內包含的數據量相差較大。

    解決方案
    盡量使用可切分的格式代替不可切分的格式,或者保證各文件實際包含數據量大致相同。

    優勢
    可撤底消除數據源側數據傾斜,效果顯著。

    劣勢
    數據源一般來源于外部系統,需要外部系統的支持。

    調整并行度分散同一個Task的不同Key

    原理

    Spark在做Shuffle時,默認使用HashPartitioner(非Hash Shuffle)對數據進行分區。如果并行度設置的不合適,可能造成大量不相同的Key對應的數據被分配到了同一個Task上,造成該Task所處理的數據遠大于其它Task,從而造成數據傾斜。

    如果調整Shuffle時的并行度,使得原本被分配到同一Task的不同Key發配到不同Task上處理,則可降低原Task所需處理的數據量,從而緩解數據傾斜問題造成的短板效應。
    spark change parallelism

    案例

    現有一張測試表,名為student_external,內有10.5億條數據,每條數據有一個唯一的id值。現從中取出id取值為9億到10.5億的共1.5億條數據,并通過一些處理,使得id為9億到9.4億間的所有數據對12取模后余數為8(即在Shuffle并行度為12時該數據集全部被HashPartition分配到第8個Task),其它數據集對其id除以100取整,從而使得id大于9.4億的數據在Shuffle時可被均勻分配到所有Task中,而id小于9.4億的數據全部分配到同一個Task中。處理過程如下

    1
    2
    3
    4
    5
    6
    7
    INSERT OVERWRITE TABLE test
    SELECT CASE WHEN id < 940000000 THEN (9500000 + (CAST (RAND() * 8 AS INTEGER)) * 12 )
    ELSE CAST(id/100 AS INTEGER)
    END,
    name
    FROM student_external
    WHERE id BETWEEN 900000000 AND 1050000000;

    通過上述處理,一份可能造成后續數據傾斜的測試數據即以準備好。接下來,使用Spark讀取該測試數據,并通過groupByKey(12)對id分組處理,且Shuffle并行度為12。代碼如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public class SparkDataSkew {
    public static void main(String[] args) {
    SparkSession sparkSession = SparkSession.builder()
    .appName("SparkDataSkewTunning")
    .config("hive.metastore.uris", "thrift://hadoop1:9083")
    .enableHiveSupport()
    .getOrCreate();

    Dataset<Row> dataframe = sparkSession.sql( "select * from test");
    dataframe.toJavaRDD()
    .mapToPair((Row row) -> new Tuple2<Integer, String>(row.getInt(0),row.getString(1)))
    .groupByKey(12)
    .mapToPair((Tuple2<Integer, Iterable<String>> tuple) -> {
    int id = tuple._1();
    AtomicInteger atomicInteger = new AtomicInteger(0);
    tuple._2().forEach((String name) -> atomicInteger.incrementAndGet());
    return new Tuple2<Integer, Integer>(id, atomicInteger.get());
    }).count();

    sparkSession.stop();
    sparkSession.close();
    }

    }

    本次實驗所使用集群節點數為4,每個節點可被Yarn使用的CPU核數為16,內存為16GB。使用如下方式提交上述應用,將啟動4個Executor,每個Executor可使用核數為12(該配置并非生產環境下的最優配置,僅用于本文實驗),可用內存為12GB。

    1
    spark-submit --queue ambari --num-executors 4 --executor-cores 12 --executor-memory 12g --class com.jasongj.spark.driver.SparkDataSkew --master yarn --deploy-mode client SparkExample-with-dependencies-1.0.jar

    GroupBy Stage的Task狀態如下圖所示,Task 8處理的記錄數為4500萬,遠大于(9倍于)其它11個Task處理的500萬記錄。而Task 8所耗費的時間為38秒,遠高于其它11個Task的平均時間(16秒)。整個Stage的時間也為38秒,該時間主要由最慢的Task 8決定。
    data skew

    在這種情況下,可以通過調整Shuffle并行度,使得原來被分配到同一個Task(即該例中的Task 8)的不同Key分配到不同Task,從而降低Task 8所需處理的數據量,緩解數據傾斜。

    通過groupByKey(48)將Shuffle并行度調整為48,重新提交到Spark。新的Job的GroupBy Stage所有Task狀態如下圖所示。
    add parallelism

    從上圖可知,記錄數最多的Task 20處理的記錄數約為1125萬,相比于并行度為12時Task 8的4500萬,降低了75%左右,而其耗時從原來Task 8的38秒降到了24秒。

    在這種場景下,調整并行度,并不意味著一定要增加并行度,也可能是減小并行度。如果通過groupByKey(11)將Shuffle并行度調整為11,重新提交到Spark。新Job的GroupBy Stage的所有Task狀態如下圖所示。
    reduce parallelism

    從上圖可見,處理記錄數最多的Task 6所處理的記錄數約為1045萬,耗時為23秒。處理記錄數最少的Task 1處理的記錄數約為545萬,耗時12秒。

    總結

    適用場景
    大量不同的Key被分配到了相同的Task造成該Task數據量過大。

    解決方案
    調整并行度。一般是增大并行度,但有時如本例減小并行度也可達到效果。

    優勢
    實現簡單,可在需要Shuffle的操作算子上直接設置并行度或者使用spark.default.parallelism設置。如果是Spark SQL,還可通過SET spark.sql.shuffle.partitions=[num_tasks]設置并行度。可用最小的代價解決問題。一般如果出現數據傾斜,都可以通過這種方法先試驗幾次,如果問題未解決,再嘗試其它方法。

    劣勢
    適用場景少,只能將分配到同一Task的不同Key分散開,但對于同一Key傾斜嚴重的情況該方法并不適用。并且該方法一般只能緩解數據傾斜,沒有徹底消除問題。從實踐經驗來看,其效果一般。

    自定義Partitioner

    原理

    使用自定義的Partitioner(默認為HashPartitioner),將原本被分配到同一個Task的不同Key分配到不同Task。

    案例

    以上述數據集為例,繼續將并發度設置為12,但是在groupByKey算子上,使用自定義的Partitioner(實現如下)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    .groupByKey(new Partitioner() {
    @Override
    public int numPartitions() {
    return 12;
    }

    @Override
    public int getPartition(Object key) {
    int id = Integer.parseInt(key.toString());
    if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {
    return (id - 9500000) / 12;
    } else {
    return id % 12;
    }
    }
    })

    由下圖可見,使用自定義Partition后,耗時最長的Task 6處理約1000萬條數據,用時15秒。并且各Task所處理的數據集大小相當。
    customizec partitioner

    總結

    適用場景
    大量不同的Key被分配到了相同的Task造成該Task數據量過大。

    解決方案
    使用自定義的Partitioner實現類代替默認的HashPartitioner,盡量將所有不同的Key均勻分配到不同的Task中。

    優勢
    不影響原有的并行度設計。如果改變并行度,后續Stage的并行度也會默認改變,可能會影響后續Stage。

    劣勢
    適用場景有限,只能將不同Key分散開,對于同一Key對應數據集非常大的場景不適用。效果與調整并行度類似,只能緩解數據傾斜而不能完全消除數據傾斜。而且需要根據數據特點自定義專用的Partitioner,不夠靈活。

    將Reduce side Join轉變為Map side Join

    原理

    通過Spark的Broadcast機制,將Reduce側Join轉化為Map側Join,避免Shuffle從而完全消除Shuffle帶來的數據傾斜。
    spark map join

    案例

    通過如下SQL創建一張具有傾斜Key且總記錄數為1.5億的大表test。

    1
    2
    3
    4
    5
    6
    INSERT OVERWRITE TABLE test
    SELECT CAST(CASE WHEN id < 980000000 THEN (95000000 + (CAST (RAND() * 4 AS INT) + 1) * 48 )
    ELSE CAST(id/10 AS INT) END AS STRING),
    name
    FROM student_external
    WHERE id BETWEEN 900000000 AND 1050000000;

    使用如下SQL創建一張數據分布均勻且總記錄數為50萬的小表test_new。

    1
    2
    3
    4
    5
    INSERT OVERWRITE TABLE test_new
    SELECT CAST(CAST(id/10 AS INT) AS STRING),
    name
    FROM student_delta_external
    WHERE id BETWEEN 950000000 AND 950500000;

    直接通過Spark Thrift Server提交如下SQL將表test與表test_new進行Join并將Join結果存于表test_join中。

    1
    2
    3
    4
    5
    INSERT OVERWRITE TABLE test_join
    SELECT test_new.id, test_new.name
    FROM test
    JOIN test_new
    ON test.id = test_new.id;

    該SQL對應的DAG如下圖所示。從該圖可見,該執行過程總共分為三個Stage,前兩個用于從Hive中讀取數據,同時二者進行Shuffle,通過最后一個Stage進行Join并將結果寫入表test_join中。
    reduce join DAG

    從下圖可見,Join Stage各Task處理的數據傾斜嚴重,處理數據量最大的Task耗時7.1分鐘,遠高于其它無數據傾斜的Task約2秒的耗時。
    reduce join DAG

    接下來,嘗試通過Broadcast實現Map側Join。實現Map側Join的方法,并非直接通過CACHE TABLE test_new將小表test_new進行cache。現通過如下SQL進行Join。

    1
    2
    3
    4
    5
    6
    CACHE TABLE test_new;
    INSERT OVERWRITE TABLE test_join
    SELECT test_new.id, test_new.name
    FROM test
    JOIN test_new
    ON test.id = test_new.id;

    通過如下DAG圖可見,該操作仍分為三個Stage,且仍然有Shuffle存在,唯一不同的是,小表的讀取不再直接掃描Hive表,而是掃描內存中緩存的表。
    reduce join DAG

    并且數據傾斜仍然存在。如下圖所示,最慢的Task耗時為7.1分鐘,遠高于其它Task的約2秒。
    reduce join DAG

    正確的使用Broadcast實現Map側Join的方式是,通過SET spark.sql.autoBroadcastJoinThreshold=104857600;將Broadcast的閾值設置得足夠大。

    再次通過如下SQL進行Join。

    1
    2
    3
    4
    5
    6
    SET spark.sql.autoBroadcastJoinThreshold=104857600;
    INSERT OVERWRITE TABLE test_join
    SELECT test_new.id, test_new.name
    FROM test
    JOIN test_new
    ON test.id = test_new.id;

    通過如下DAG圖可見,該方案只包含一個Stage。
    reduce join DAG

    并且從下圖可見,各Task耗時相當,無明顯數據傾斜現象。并且總耗時為1.5分鐘,遠低于Reduce側Join的7.3分鐘。
    reduce join DAG

    總結

    適用場景
    參與Join的一邊數據集足夠小,可被加載進Driver并通過Broadcast方法廣播到各個Executor中。

    解決方案
    在Java/Scala代碼中將小數據集數據拉取到Driver,然后通過Broadcast方案將小數據集的數據廣播到各Executor。或者在使用SQL前,將Broadcast的閾值調整得足夠大,從而使用Broadcast生效。進而將Reduce側Join替換為Map側Join。

    優勢
    避免了Shuffle,徹底消除了數據傾斜產生的條件,可極大提升性能。

    劣勢
    要求參與Join的一側數據集足夠小,并且主要適用于Join的場景,不適合聚合的場景,適用條件有限。

    為skew的key增加隨機前/后綴

    原理

    為數據量特別大的Key增加隨機前/后綴,使得原來Key相同的數據變為Key不相同的數據,從而使傾斜的數據集分散到不同的Task中,徹底解決數據傾斜問題。Join另一則的數據中,與傾斜Key對應的部分數據,與隨機前綴集作笛卡爾乘積,從而保證無論數據傾斜側傾斜Key如何加前綴,都能與之正常Join。
    spark random prefix

    案例

    通過如下SQL,將id為9億到9.08億共800萬條數據的id轉為9500048或者9500096,其它數據的id除以100取整。從而該數據集中,id為9500048和9500096的數據各400萬,其它id對應的數據記錄數均為100條。這些數據存于名為test的表中。

    對于另外一張小表test_new,取出50萬條數據,并將id(遞增且唯一)除以100取整,使得所有id都對應100條數據。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    INSERT OVERWRITE TABLE test
    SELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS INT) + 1) * 48 )
    ELSE CAST(id/100 AS INT) END AS STRING),
    name
    FROM student_external
    WHERE id BETWEEN 900000000 AND 1050000000;

    INSERT OVERWRITE TABLE test_new
    SELECT CAST(CAST(id/100 AS INT) AS STRING),
    name
    FROM student_delta_external
    WHERE id BETWEEN 950000000 AND 950500000;

    通過如下代碼,讀取test表對應的文件夾內的數據并轉換為JavaPairRDD存于leftRDD中,同樣讀取test表對應的數據存于rightRDD中。通過RDD的join算子對leftRDD與rightRDD進行Join,并指定并行度為48。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class SparkDataSkew{
    public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName("DemoSparkDataFrameWithSkewedBigTableDirect");
    sparkConf.set("spark.default.parallelism", String.valueOf(parallelism));
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

    JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
    .mapToPair((String row) -> {
    String[] str = row.split(",");
    return new Tuple2<String, String>(str[0], str[1]);
    });

    JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
    .mapToPair((String row) -> {
    String[] str = row.split(",");
    return new Tuple2<String, String>(str[0], str[1]);
    });

    leftRDD.join(rightRDD, parallelism)
    .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2()))
    .foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
    AtomicInteger atomicInteger = new AtomicInteger();
    iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
    });

    javaSparkContext.stop();
    javaSparkContext.close();
    }
    }

    從下圖可看出,整個Join耗時1分54秒,其中Join Stage耗時1.7分鐘。
    few skewed key join

    通過分析Join Stage的所有Task可知,在其它Task所處理記錄數為192.71萬的同時Task 32的處理的記錄數為992.72萬,故它耗時為1.7分鐘,遠高于其它Task的約10秒。這與上文準備數據集時,將id為9500048為9500096對應的數據量設置非常大,其它id對應的數據集非常均勻相符合。
    few skewed key join

    現通過如下操作,實現傾斜Key的分散處理

    • 將leftRDD中傾斜的key(即9500048與9500096)對應的數據單獨過濾出來,且加上1到24的隨機前綴,并將前綴與原數據用逗號分隔(以方便之后去掉前綴)形成單獨的leftSkewRDD
    • 將rightRDD中傾斜key對應的數據抽取出來,并通過flatMap操作將該數據集中每條數據均轉換為24條數據(每條分別加上1到24的隨機前綴),形成單獨的rightSkewRDD
    • 將leftSkewRDD與rightSkewRDD進行Join,并將并行度設置為48,且在Join過程中將隨機前綴去掉,得到傾斜數據集的Join結果skewedJoinRDD
    • 將leftRDD中不包含傾斜Key的數據抽取出來作為單獨的leftUnSkewRDD
    • 對leftUnSkewRDD與原始的rightRDD進行Join,并行度也設置為48,得到Join結果unskewedJoinRDD
    • 通過union算子將skewedJoinRDD與unskewedJoinRDD進行合并,從而得到完整的Join結果集

    具體實現代碼如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    public class SparkDataSkew{
    public static void main(String[] args) {
    int parallelism = 48;
    SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName("SolveDataSkewWithRandomPrefix");
    sparkConf.set("spark.default.parallelism", parallelism + "");
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

    JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
    .mapToPair((String row) -> {
    String[] str = row.split(",");
    return new Tuple2<String, String>(str[0], str[1]);
    });

    JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
    .mapToPair((String row) -> {
    String[] str = row.split(",");
    return new Tuple2<String, String>(str[0], str[1]);
    });

    String[] skewedKeyArray = new String[]{"9500048", "9500096"};
    Set<String> skewedKeySet = new HashSet<String>();
    List<String> addList = new ArrayList<String>();
    for(int i = 1; i <=24; i++) {
    addList.add(i + "");
    }
    for(String key : skewedKeyArray) {
    skewedKeySet.add(key);
    }

    Broadcast<Set<String>> skewedKeys = javaSparkContext.broadcast(skewedKeySet);
    Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);

    JavaPairRDD<String, String> leftSkewRDD = leftRDD
    .filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
    .mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>((new Random().nextInt(24) + 1) + "," + tuple._1(), tuple._2()));

    JavaPairRDD<String, String> rightSkewRDD = rightRDD.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1()))
    .flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
    .map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
    .collect(Collectors.toList())
    .iterator()
    );

    JavaPairRDD<String, String> skewedJoinRDD = leftSkewRDD
    .join(rightSkewRDD, parallelism)
    .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2()));

    JavaPairRDD<String, String> leftUnSkewRDD = leftRDD.filter((Tuple2<String, String> tuple) -> !skewedKeys.value().contains(tuple._1()));
    JavaPairRDD<String, String> unskewedJoinRDD = leftUnSkewRDD.join(rightRDD, parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2()));

    skewedJoinRDD.union(unskewedJoinRDD).foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
    AtomicInteger atomicInteger = new AtomicInteger();
    iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
    });

    javaSparkContext.stop();
    javaSparkContext.close();
    }
    }

    從下圖可看出,整個Join耗時58秒,其中Join Stage耗時33秒。
    few skewed key join

    通過分析Join Stage的所有Task可知

    • 由于Join分傾斜數據集Join和非傾斜數據集Join,而各Join的并行度均為48,故總的并行度為96
    • 由于提交任務時,設置的Executor個數為4,每個Executor的core數為12,故可用Core數為48,所以前48個Task同時啟動(其Launch時間相同),后48個Task的啟動時間各不相同(等待前面的Task結束才開始)
    • 由于傾斜Key被加上隨機前綴,原本相同的Key變為不同的Key,被分散到不同的Task處理,故在所有Task中,未發現所處理數據集明顯高于其它Task的情況

    few skewed key join

    實際上,由于傾斜Key與非傾斜Key的操作完全獨立,可并行進行。而本實驗受限于可用總核數為48,可同時運行的總Task數為48,故而該方案只是將總耗時減少一半(效率提升一倍)。如果資源充足,可并發執行Task數增多,該方案的優勢將更為明顯。在實際項目中,該方案往往可提升數倍至10倍的效率。

    總結

    適用場景
    兩張表都比較大,無法使用Map則Join。其中一個RDD有少數幾個Key的數據量過大,另外一個RDD的Key分布較為均勻。

    解決方案
    將有數據傾斜的RDD中傾斜Key對應的數據集單獨抽取出來加上隨機前綴,另外一個RDD每條數據分別與隨機前綴結合形成新的RDD(相當于將其數據增到到原來的N倍,N即為隨機前綴的總個數),然后將二者Join并去掉前綴。然后將不包含傾斜Key的剩余數據進行Join。最后將兩次Join的結果集通過union合并,即可得到全部Join結果。

    優勢
    相對于Map則Join,更能適應大數據集的Join。如果資源充足,傾斜部分數據集與非傾斜部分數據集可并行進行,效率提升明顯。且只針對傾斜部分的數據做數據擴展,增加的資源消耗有限。

    劣勢
    如果傾斜Key非常多,則另一側數據膨脹非常大,此方案不適用。而且此時對傾斜Key與非傾斜Key分開處理,需要掃描數據集兩遍,增加了開銷。

    大表隨機添加N種隨機前綴,小表擴大N倍

    原理

    如果出現數據傾斜的Key比較多,上一種方法將這些大量的傾斜Key分拆出來,意義不大。此時更適合直接對存在數據傾斜的數據集全部加上隨機前綴,然后對另外一個不存在嚴重數據傾斜的數據集整體與隨機前綴集作笛卡爾乘積(即將數據量擴大N倍)。
    spark random prefix

    案例

    這里給出示例代碼,讀者可參考上文中分拆出少數傾斜Key添加隨機前綴的方法,自行測試。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    public class SparkDataSkew {
    public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName("ResolveDataSkewWithNAndRandom");
    sparkConf.set("spark.default.parallelism", parallelism + "");
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

    JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/")
    .mapToPair((String row) -> {
    String[] str = row.split(",");
    return new Tuple2<String, String>(str[0], str[1]);
    });

    JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/")
    .mapToPair((String row) -> {
    String[] str = row.split(",");
    return new Tuple2<String, String>(str[0], str[1]);
    });

    List<String> addList = new ArrayList<String>();
    for(int i = 1; i <=48; i++) {
    addList.add(i + "");
    }

    Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList);

    JavaPairRDD<String, String> leftRandomRDD = leftRDD.mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>(new Random().nextInt(48) + "," + tuple._1(), tuple._2()));

    JavaPairRDD<String, String> rightNewRDD = rightRDD
    .flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream()
    .map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2()))
    .collect(Collectors.toList())
    .iterator()
    );

    JavaPairRDD<String, String> joinRDD = leftRandomRDD
    .join(rightNewRDD, parallelism)
    .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2()));

    joinRDD.foreachPartition((Iterator<Tuple2<String, String>> iterator) -> {
    AtomicInteger atomicInteger = new AtomicInteger();
    iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet());
    });

    javaSparkContext.stop();
    javaSparkContext.close();
    }
    }

    總結

    適用場景
    一個數據集存在的傾斜Key比較多,另外一個數據集數據分布比較均勻。

    優勢
    對大部分場景都適用,效果不錯。

    劣勢
    需要將一個數據集整體擴大N倍,會增加資源消耗。

    總結

    對于數據傾斜,并無一個統一的一勞永逸的方法。更多的時候,是結合數據特點(數據集大小,傾斜Key的多少等)綜合使用上文所述的多種方法。

    Spark 系列文章

    郭俊 Jason wechat
    歡迎關注作者微信公眾號【大數據架構】
    您的贊賞將支持作者繼續原創分享
    速赢彩app 芜湖 | 图木舒克 | 五家渠 | 桐乡 | 长治 | 长兴 | 舟山 | 招远 | 牡丹江 | 鄢陵 | 余姚 | 漳州 | 烟台 | 梧州 | 肇庆 | 甘南 | 徐州 | 保亭 | 桂林 | 阿勒泰 | 乐山 | 荆门 | 甘肃兰州 | 抚顺 | 如东 | 雅安 | 商洛 | 盘锦 | 常州 | 甘肃兰州 | 辽源 | 岳阳 | 广西南宁 | 莱芜 | 延边 | 株洲 | 香港香港 | 乐平 | 厦门 | 任丘 | 金华 | 淮安 | 松原 | 丽水 | 湖州 | 安康 | 日土 | 包头 | 河源 | 大理 | 绵阳 | 吉安 | 霍邱 | 汉中 | 定安 | 桂林 | 三亚 | 淄博 | 中卫 | 大庆 | 昌吉 | 保定 | 亳州 | 铁岭 | 济南 | 武夷山 | 塔城 | 赵县 | 蓬莱 | 瑞安 | 汉中 | 通辽 | 锦州 | 琼中 | 广安 | 郴州 | 石狮 | 三亚 | 安阳 | 铜仁 | 邵阳 | 广汉 | 黑河 | 泗阳 | 吉林长春 | 博尔塔拉 | 榆林 | 西双版纳 | 项城 | 黔东南 | 喀什 | 普洱 | 株洲 | 滕州 | 桐城 | 澳门澳门 | 洛阳 | 秦皇岛 | 明港 | 白沙 | 和田 | 宣城 | 宜昌 | 海门 | 北海 | 龙岩 | 玉溪 | 深圳 | 台南 | 日土 | 赤峰 | 驻马店 | 锡林郭勒 |