• <menu id="gyiem"><menu id="gyiem"></menu></menu>
  • <menu id="gyiem"><code id="gyiem"></code></menu>

    Kafka設計解析(七)- Kafka Stream

    原創文章,轉載請務必將下面這段話置于文章開頭處。
    本文轉發自技術世界原文鏈接 http://www.luozeyang.com/kafka/kafka_stream/

    Kafka Stream背景

    Kafka Stream是什么

    Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature。它是提供了對存儲于Kafka內的數據進行流式處理和分析的功能。

    Kafka Stream的特點如下:

    • Kafka Stream提供了一個非常簡單而輕量的Library,它可以非常方便地嵌入任意Java應用中,也可以任意方式打包和部署
    • 除了Kafka外,無任何外部依賴
    • 充分利用Kafka分區機制實現水平擴展和順序性保證
    • 通過可容錯的state store實現高效的狀態操作(如windowed join和aggregation)
    • 支持正好一次處理語義
    • 提供記錄級的處理能力,從而實現毫秒級的低延遲
    • 支持基于事件時間的窗口操作,并且可處理晚到的數據(late arrival of records)
    • 同時提供底層的處理原語Processor(類似于Storm的spout和bolt),以及高層抽象的DSL(類似于Spark的map/group/reduce)

    什么是流式計算

    一般流式計算會與批量計算相比較。在流式計算模型中,輸入是持續的,可以認為在時間上是無界的,也就意味著,永遠拿不到全量數據去做計算。同時,計算結果是持續輸出的,也即計算結果在時間上也是無界的。流式計算一般對實時性要求較高,同時一般是先定義目標計算,然后數據到來之后將計算邏輯應用于數據。同時為了提高計算效率,往往盡可能采用增量計算代替全量計算。


    Stream Processing

    批量處理模型中,一般先有全量數據集,然后定義計算邏輯,并將計算應用于全量數據。特點是全量計算,并且計算結果一次性全量輸出。


    Batch Processing

    為什么要有Kafka Stream

    當前已經有非常多的流式處理系統,最知名且應用最多的開源流式處理系統有Spark Streaming和Apache Storm。Apache Storm發展多年,應用廣泛,提供記錄級別的處理能力,當前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便與圖計算,SQL處理等集成,功能強大,對于熟悉其它Spark應用開發的用戶而言使用門檻低。另外,目前主流的Hadoop發行版,如MapR,Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。

    既然Apache Spark與Apache Storm擁用如此多的優勢,那為何還需要Kafka Stream呢?筆者認為主要有如下原因。

    第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基于Kafka的流式處理類庫。框架要求開發者按照特定的方式去開發邏輯部分,供框架調用。開發者很難了解框架的具體運行方式,從而使得調試成本高,并且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發者調用,整個應用的運行方式主要由開發者控制,方便使用和調試。


    Library vs. Framework

    第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對復雜。而Kafka Stream作為類庫,可以非常方便的嵌入應用程序中,它對應用的打包和部署基本沒有任何要求。更為重要的是,Kafka Stream充分利用了Kafka的分區機制Consumer的Rebalance機制,使得Kafka Stream可以非常方便的水平擴展,并且各個實例可以使用不同的部署方式。具體來說,每個運行Kafka Stream的應用程序實例都包含了Kafka Consumer實例,多個同一應用的實例之間并行處理數據集。而不同實例之間的部署方式并不要求一致,比如部分實例可以運行在Web容器中,部分實例可運行在Docker或Kubernetes中。

    第三,就流式處理系統而言,基本都支持Kafka作為數據源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統的標準數據源。換言之,大部分流式系統中都已部署了Kafka,此時使用Kafka Stream的成本非常低。

    第四,使用Storm或Spark Streaming時,需要為框架本身的進程預留資源,如Storm的supervisor和Spark on YARN的node manager。即使對于應用實例而言,框架本身也會占用部分資源,如Spark Streaming需要為shuffle和storage預留內存。

    第五,由于Kafka本身提供數據持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。

    第六,由于Kafka Consumer Rebalance機制,Kafka Stream可以在線動態調整并行度。

    Kafka Stream架構

    Kafka Stream整體架構

    Kafka Stream的整體架構圖如下所示。


    Kafka Stream Architecture

    目前(Kafka 0.11.0.0)Kafka Stream的數據源只能如上圖所示是Kafka。但是處理結果并不一定要如上圖所示輸出到Kafka。實際上KStream和Ktable的實例化都需要指定Topic。

    1
    2
    3
    KStream<String, String> stream = builder.stream("words-stream");

    KTable<String, String> table = builder.table("words-table", "words-store");

    另外,上圖中的Consumer和Producer并不需要開發者在應用中顯示實例化,而是由Kafka Stream根據參數隱式實例化和管理,從而降低了使用門檻。開發者只需要專注于開發核心業務邏輯,也即上圖中Task內的部分。

    Processor Topology

    基于Kafka Stream的流式應用的業務邏輯全部通過一個被稱為Processor Topology的地方執行。它與Storm的Topology和Spark的DAG類似,都定義了數據在各個處理單元(在Kafka Stream中被稱作Processor)間的流動方式,或者說定義了數據的處理邏輯。

    下面是一個Processor的示例,它實現了Word Count功能,并且每秒輸出一次結果。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    public class WordCountProcessor implements Processor<String, String> {

    private ProcessorContext context;
    private KeyValueStore<String, Integer> kvStore;

    @SuppressWarnings("unchecked")
    @Override
    public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(1000);
    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
    }

    @Override
    public void process(String key, String value) {
    Stream.of(value.toLowerCase().split(" ")).forEach((String word) -> {
    Optional<Integer> counts = Optional.ofNullable(kvStore.get(word));
    int count = counts.map(wordcount -> wordcount + 1).orElse(1);
    kvStore.put(word, count);
    });
    }

    @Override
    public void punctuate(long timestamp) {
    KeyValueIterator<String, Integer> iterator = this.kvStore.all();
    iterator.forEachRemaining(entry -> {
    context.forward(entry.key, entry.value);
    this.kvStore.delete(entry.key);
    });
    context.commit();
    }

    @Override
    public void close() {
    this.kvStore.close();
    }

    }

    從上述代碼中可見

    • process定義了對每條記錄的處理邏輯,也印證了Kafka可具有記錄級的數據處理能力。
    • context.scheduler定義了punctuate被執行的周期,從而提供了實現窗口操作的能力。
    • context.getStateStore提供的狀態存儲為有狀態計算(如窗口,聚合)提供了可能。

    Kafka Stream并行模型

    Kafka Stream的并行模型中,最小粒度為Task,而每個Task包含一個特定子Topology的所有Processor。因此每個Task所執行的代碼完全一樣,唯一的不同在于所處理的數據集互補。這一點跟Storm的Topology完全不一樣。Storm的Topology的每一個Task只包含一個Spout或Bolt的實例。因此Storm的一個Topology內的不同Task之間需要通過網絡通信傳遞數據,而Kafka Stream的Task包含了完整的子Topology,所以Task之間不需要傳遞數據,也就不需要網絡通信。這一點降低了系統復雜度,也提高了處理效率。

    如果某個Stream的輸入Topic有多個(比如2個Topic,1個Partition數為4,另一個Partition數為3),則總的Task數等于Partition數最多的那個Topic的Partition數(max(4,3)=4)。這是因為Kafka Stream使用了Consumer的Rebalance機制,每個Partition對應一個Task。

    下圖展示了在一個進程(Instance)中以2個Topic(Partition數均為4)為數據源的Kafka Stream應用的并行模型。從圖中可以看到,由于Kafka Stream應用的默認線程數為1,所以4個Task全部在一個線程中運行。


    1 thread

    為了充分利用多線程的優勢,可以設置Kafka Stream的線程數。下圖展示了線程數為2時的并行模型。


    2 threads

    前文有提到,Kafka Stream可被嵌入任意Java應用(理論上基于JVM的應用都可以)中,下圖展示了在同一臺機器的不同進程中同時啟動同一Kafka Stream應用時的并行模型。注意,這里要保證兩個進程的StreamsConfig.APPLICATION_ID_CONFIG完全一樣。因為Kafka Stream將APPLICATION_ID_CONFIG作為隱式啟動的Consumer的Group ID。只有保證APPLICATION_ID_CONFIG相同,才能保證這兩個進程的Consumer屬于同一個Group,從而可以通過Consumer Rebalance機制拿到互補的數據集。


    2 instances

    既然實現了多進程部署,可以以同樣的方式實現多機器部署。該部署方式也要求所有進程的APPLICATION_ID_CONFIG完全一樣。從圖上也可以看到,每個實例中的線程數并不要求一樣。但是無論如何部署,Task總數總會保證一致。


    2 servers

    注意:Kafka Stream的并行模型,非常依賴于《Kafka設計解析(一)- Kafka背景及架構介紹》一文中介紹的Kafka分區機制和《Kafka設計解析(四)- Kafka Consumer設計解析》中介紹的Consumer的Rebalance機制。強烈建議不太熟悉這兩種機制的朋友,先行閱讀這兩篇文章。

    這里對比一下Kafka Stream的Processor Topology與Storm的Topology。

    • Storm的Topology由Spout和Bolt組成,Spout提供數據源,而Bolt提供計算和數據導出。Kafka Stream的Processor Topology完全由Processor組成,因為它的數據固定由Kafka的Topic提供。
    • Storm的不同Bolt運行在不同的Executor中,很可能位于不同的機器,需要通過網絡通信傳輸數據。而Kafka Stream的Processor Topology的不同Processor完全運行于同一個Task中,也就完全處于同一個線程,無需網絡通信。
    • Storm的Topology可以同時包含Shuffle部分和非Shuffle部分,并且往往一個Topology就是一個完整的應用。而Kafka Stream的一個物理Topology只包含非Shuffle部分,而Shuffle部分需要通過through操作顯示完成,該操作將一個大的Topology分成了2個子Topology。
    • Storm的Topology內,不同Bolt/Spout的并行度可以不一樣,而Kafka Stream的子Topology內,所有Processor的并行度完全一樣。
    • Storm的一個Task只包含一個Spout或者Bolt的實例,而Kafka Stream的一個Task包含了一個子Topology的所有Processor。

    KTable vs. KStream

    KTable和KStream是Kafka Stream中非常重要的兩個概念,它們是Kafka實現各種語義的基礎。因此這里有必要分析下二者的區別。

    KStream是一個數據流,可以認為所有記錄都通過Insert only的方式插入進這個數據流里。而KTable代表一個完整的數據集,可以理解為數據庫中的表。由于每條記錄都是Key-Value對,這里可以將Key理解為數據庫中的Primary Key,而Value可以理解為一行記錄。可以認為KTable中的數據都是通過Update only的方式進入的。也就意味著,如果KTable對應的Topic中新進入的數據的Key已經存在,那么從KTable只會取出同一Key對應的最后一條數據,相當于新的數據更新了舊的數據。

    以下圖為例,假設有一個KStream和KTable,基于同一個Topic創建,并且該Topic中包含如下圖所示5條數據。此時遍歷KStream將得到與Topic內數據完全一樣的所有5條數據,且順序不變。而此時遍歷KTable時,因為這5條記錄中有3個不同的Key,所以將得到3條記錄,每個Key對應最新的值,并且這三條數據之間的順序與原來在Topic中的順序保持一致。這一點與Kafka的日志compact相同。


    KStream vs. KTable

    此時如果對該KStream和KTable分別基于key做Group,對Value進行Sum,得到的結果將會不同。對KStream的計算結果是<Jack,4><Lily,7><Mike,4>。而對Ktable的計算結果是<Mike,4><Jack,3><Lily,5>

    State store

    流式處理中,部分操作是無狀態的,例如過濾操作(Kafka Stream DSL中用filer方法實現)。而部分操作是有狀態的,需要記錄中間狀態,如Window操作和聚合計算。State store被用來存儲中間狀態。它可以是一個持久化的Key-Value存儲,也可以是內存中的HashMap,或者是數據庫。Kafka提供了基于Topic的狀態存儲。

    Topic中存儲的數據記錄本身是Key-Value形式的,同時Kafka的log compaction機制可對歷史數據做compact操作,保留每個Key對應的最后一個Value,從而在保證Key不丟失的前提下,減少總數據量,從而提高查詢效率。

    構造KTable時,需要指定其state store name。默認情況下,該名字也即用于存儲該KTable的狀態的Topic的名字,遍歷KTable的過程,實際就是遍歷它對應的state store,或者說遍歷Topic的所有key,并取每個Key最新值的過程。為了使得該過程更加高效,默認情況下會對該Topic進行compact操作。

    另外,除了KTable,所有狀態計算,都需要指定state store name,從而記錄中間狀態。

    Kafka Stream如何解決流式系統中關鍵問題

    時間

    在流式數據處理中,時間是數據的一個非常重要的屬性。從Kafka 0.10開始,每條記錄除了Key和Value外,還增加了timestamp屬性。目前Kafka Stream支持三種時間

    • 事件發生時間。事件發生的時間,包含在數據記錄中。發生時間由Producer在構造ProducerRecord時指定。并且需要Broker或者Topic將message.timestamp.type設置為CreateTime(默認值)才能生效。
    • 消息接收時間,也即消息存入Broker的時間。當Broker或Topic將message.timestamp.type設置為LogAppendTime時生效。此時Broker會在接收到消息后,存入磁盤前,將其timestamp屬性值設置為當前機器時間。一般消息接收時間比較接近于事件發生時間,部分場景下可代替事件發生時間。
    • 消息處理時間,也即Kafka Stream處理消息時的時間。

    注:Kafka Stream允許通過實現org.apache.kafka.streams.processor.TimestampExtractor接口自定義記錄時間。

    窗口

    前文提到,流式數據是在時間上無界的數據。而聚合操作只能作用在特定的數據集,也即有界的數據集上。因此需要通過某種方式從無界的數據集上按特定的語義選取出有界的數據。窗口是一種非常常用的設定計算邊界的方式。不同的流式處理系統支持的窗口類似,但不盡相同。

    Kafka Stream支持的窗口如下。

    1. Hopping Time Window 該窗口定義如下圖所示。它有兩個屬性,一個是Window size,一個是Advance interval。Window size指定了窗口的大小,也即每次計算的數據集的大小。而Advance interval定義輸出的時間間隔。一個典型的應用場景是,每隔5秒鐘輸出一次過去1個小時內網站的PV或者UV。


      Hopping Time Window
    2. Tumbling Time Window該窗口定義如下圖所示。可以認為它是Hopping Time Window的一種特例,也即Window size和Advance interval相等。它的特點是各個Window之間完全不相交。


      Tumbling Time Window
    3. Sliding Window該窗口只用于2個KStream進行Join計算時。該窗口的大小定義了Join兩側KStream的數據記錄被認為在同一個窗口的最大時間差。假設該窗口的大小為5秒,則參與Join的2個KStream中,記錄時間差小于5的記錄被認為在同一個窗口中,可以進行Join計算。

    4. Session Window該窗口用于對Key做Group后的聚合操作中。它需要對Key做分組,然后對組內的數據根據業務需求定義一個窗口的起始點和結束點。一個典型的案例是,希望通過Session Window計算某個用戶訪問網站的時間。對于一個特定的用戶(用Key表示)而言,當發生登錄操作時,該用戶(Key)的窗口即開始,當發生退出操作或者超時時,該用戶(Key)的窗口即結束。窗口結束時,可計算該用戶的訪問時間或者點擊次數等。

    Join

    Kafka Stream由于包含KStream和Ktable兩種數據集,因此提供如下Join計算

    • KTable Join KTable 結果仍為KTable。任意一邊有更新,結果KTable都會更新。
    • KStream Join KStream 結果為KStream。必須帶窗口操作,否則會造成Join操作一直不結束。
    • KStream Join KTable / GlobalKTable 結果為KStream。只有當KStream中有新數據時,才會觸發Join計算并輸出結果。KStream無新數據時,KTable的更新并不會觸發Join計算,也不會輸出數據。并且該更新只對下次Join生效。一個典型的使用場景是,KStream中的訂單信息與KTable中的用戶信息做關聯計算。

    對于Join操作,如果要得到正確的計算結果,需要保證參與Join的KTable或KStream中Key相同的數據被分配到同一個Task。具體方法是

    • 參與Join的KTable或KStream的Key類型相同(實際上,業務含意也應該相同)
    • 參與Join的KTable或KStream對應的Topic的Partition數相同
    • Partitioner策略的最終結果等效(實現不需要完全一樣,只要效果一樣即可),也即Key相同的情況下,被分配到ID相同的Partition內

    如果上述條件不滿足,可通過調用如下方法使得它滿足上述條件。

    1
    KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic)

    聚合與亂序處理

    聚合操作可應用于KStream和KTable。當聚合發生在KStream上時必須指定窗口,從而限定計算的目標數據集。

    需要說明的是,聚合操作的結果肯定是KTable。因為KTable是可更新的,可以在晚到的數據到來時(也即發生數據亂序時)更新結果KTable。

    這里舉例說明。假設對KStream以5秒為窗口大小,進行Tumbling Time Window上的Count操作。并且KStream先后出現時間為1秒, 3秒, 5秒的數據,此時5秒的窗口已達上限,Kafka Stream關閉該窗口,觸發Count操作并將結果3輸出到KTable中(假設該結果表示為<1-5,3>)。若1秒后,又收到了時間為2秒的記錄,由于1-5秒的窗口已關閉,若直接拋棄該數據,則可認為之前的結果<1-5,3>不準確。而如果直接將完整的結果<1-5,4>輸出到KStream中,則KStream中將會包含該窗口的2條記錄,<1-5,3>, <1-5,4>,也會存在骯數據。因此Kafka Stream選擇將聚合結果存于KTable中,此時新的結果<1-5,4>會替代舊的結果<1-5,3>。用戶可得到完整的正確的結果。

    這種方式保證了數據準確性,同時也提高了容錯性。

    但需要說明的是,Kafka Stream并不會對所有晚到的數據都重新計算并更新結果集,而是讓用戶設置一個retention period,將每個窗口的結果集在內存中保留一定時間,該窗口內的數據晚到時,直接合并計算,并更新結果KTable。超過retention period后,該窗口結果將從內存中刪除,并且晚到的數據即使落入窗口,也會被直接丟棄。

    容錯

    Kafka Stream從如下幾個方面進行容錯

    • 高可用的Partition保證無數據丟失。每個Task計算一個Partition,而Kafka數據復制機制保證了Partition內數據的高可用性,故無數據丟失風險。同時由于數據是持久化的,即使任務失敗,依然可以重新計算。
    • 狀態存儲實現快速故障恢復和從故障點繼續處理。對于Join和聚合及窗口等有狀態計算,狀態存儲可保存中間狀態。即使發生Failover或Consumer Rebalance,仍然可以通過狀態存儲恢復中間狀態,從而可以繼續從Failover或Consumer Rebalance前的點繼續計算。
    • KTable與retention period提供了對亂序數據的處理能力。

    Kafka Stream應用示例

    下面結合一個案例來講解如何開發Kafka Stream應用。本例完整代碼可從作者Github獲取。

    訂單KStream(名為orderStream),底層Topic的Partition數為3,Key為用戶名,Value包含用戶名,商品名,訂單時間,數量。用戶KTable(名為userTable),底層Topic的Partition數為3,Key為用戶名,Value包含性別,地址和年齡。商品KTable(名為itemTable),底層Topic的Partition數為6,Key為商品名,價格,種類和產地。現在希望計算每小時購買產地與自己所在地相同的用戶總數。

    首先由于希望使用訂單時間,而它包含在orderStream的Value中,需要通過提供一個實現TimestampExtractor接口的類從orderStream對應的Topic中抽取出訂單時間。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public class OrderTimestampExtractor implements TimestampExtractor {

    @Override
    public long extract(ConsumerRecord<Object, Object> record) {
    if(record instanceof Order) {
    return ((Order)record).getTS();
    } else {
    return 0;
    }
    }
    }

    接著通過將orderStream與userTable進行Join,來獲取訂單用戶所在地。由于二者對應的Topic的Partition數相同,且Key都為用戶名,再假設Producer往這兩個Topic寫數據時所用的Partitioner實現相同,則此時上文所述Join條件滿足,可直接進行Join。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    orderUserStream = orderStream
    .leftJoin(userTable,
    // 該lamda表達式定義了如何從orderStream與userTable生成結果集的Value
    (Order order, User user) -> OrderUser.fromOrderUser(order, user),
    // 結果集Key序列化方式
    Serdes.String(),
    // 結果集Value序列化方式
    SerdesFactory.serdFrom(Order.class))
    .filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)

    從上述代碼中,可以看到,Join時需要指定如何從參與Join雙方的記錄生成結果記錄的Value。Key不需要指定,因為結果記錄的Key與Join Key相同,故無須指定。Join結果存于名為orderUserStream的KStream中。

    接下來需要將orderUserStream與itemTable進行Join,從而獲取商品產地。此時orderUserStream的Key仍為用戶名,而itemTable對應的Topic的Key為產品名,并且二者的Partition數不一樣,因此無法直接Join。此時需要通過through方法,對其中一方或雙方進行重新分區,使得二者滿足Join條件。這一過程相當于Spark的Shuffle過程和Storm的FieldGrouping。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    orderUserStrea
    .through(
    // Key的序列化方式
    Serdes.String(),
    // Value的序列化方式
    SerdesFactory.serdFrom(OrderUser.class),
    // 重新按照商品名進行分區,具體取商品名的哈希值,然后對分區數取模
    (String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions,
    "orderuser-repartition-by-item")
    .leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serdes.String(), SerdesFactory.serdFrom(OrderUser.class))

    從上述代碼可見,through時需要指定Key的序列化器,Value的序列化器,以及分區方式和結果集所在的Topic。這里要注意,該Topic(orderuser-repartition-by-item)的Partition數必須與itemTable對應Topic的Partition數相同,并且through使用的分區方法必須與iteamTable對應Topic的分區方式一樣。經過這種through操作,orderUserStream與itemTable滿足了Join條件,可直接進行Join。

    總結

    • Kafka Stream的并行模型完全基于Kafka的分區機制和Rebalance機制,實現了在線動態調整并行度
    • 同一Task包含了一個子Topology的所有Processor,使得所有處理邏輯都在同一線程內完成,避免了不必的網絡通信開銷,從而提高了效率。
    • through方法提供了類似Spark的Shuffle機制,為使用不同分區策略的數據提供了Join的可能
    • log compact提高了基于Kafka的state store的加載效率
    • state store為狀態計算提供了可能
    • 基于offset的計算進度管理以及基于state store的中間狀態管理為發生Consumer rebalance或Failover時從斷點處繼續處理提供了可能,并為系統容錯性提供了保障
    • KTable的引入,使得聚合計算擁用了處理亂序問題的能力

    Kafka系列文章

    郭俊 Jason wechat
    歡迎關注作者微信公眾號【大數據架構】
    您的贊賞將支持作者繼續原創分享
    速赢彩app 温岭 | 神农架 | 嘉兴 | 义乌 | 屯昌 | 武夷山 | 阿里 | 黄南 | 宜都 | 中山 | 湛江 | 海西 | 曹县 | 双鸭山 | 永康 | 济源 | 三河 | 日土 | 文昌 | 商丘 | 果洛 | 延安 | 淮北 | 白城 | 永新 | 澳门澳门 | 辽阳 | 龙岩 | 铜陵 | 平凉 | 诸城 | 泗阳 | 余姚 | 霍邱 | 宜春 | 金昌 | 衡阳 | 柳州 | 新沂 | 阜阳 | 巢湖 | 内蒙古呼和浩特 | 定州 | 保山 | 醴陵 | 台山 | 长垣 | 长葛 | 乐山 | 淄博 | 燕郊 | 保亭 | 阿拉善盟 | 黑龙江哈尔滨 | 大兴安岭 | 海拉尔 | 鹰潭 | 新疆乌鲁木齐 | 铜陵 | 库尔勒 | 周口 | 忻州 | 玉林 | 巴音郭楞 | 神木 | 大兴安岭 | 庆阳 | 玉环 | 保山 | 西藏拉萨 | 库尔勒 | 柳州 | 葫芦岛 | 景德镇 | 嘉善 | 鹤岗 | 芜湖 | 锡林郭勒 | 石河子 | 来宾 | 黄石 | 兴安盟 | 绵阳 | 潮州 | 图木舒克 | 咸阳 | 燕郊 | 资阳 | 株洲 | 温州 | 铜陵 | 大同 | 广饶 | 包头 | 金昌 | 澄迈 | 东阳 | 宣城 | 沛县 | 扬州 | 陵水 | 临汾 | 黄南 | 白城 | 大庆 | 义乌 | 中山 | 双鸭山 | 玉树 | 黔东南 | 灵宝 | 晋城 | 六盘水 |