• <menu id="gyiem"><menu id="gyiem"></menu></menu>
  • <menu id="gyiem"><code id="gyiem"></code></menu>

    Kafka設計解析(三)- Kafka High Availability (下)

    原創文章,轉載請務必將下面這段話置于文章開頭處。(已授權InfoQ中文站發布
    本文轉發自技術世界原文鏈接 http://www.luozeyang.com/2015/06/08/KafkaColumn3

    摘要

      本文在上篇文章基礎上,更加深入講解了Kafka的HA機制,主要闡述了HA相關各種場景,如Broker failover,Controller failover,Topic創建/刪除,Broker啟動,Follower從Leader fetch數據等詳細處理過程。同時介紹了Kafka提供的與Replication相關的工具,如重新分配Partition等。

    Broker Failover過程

    Controller對Broker failure的處理過程

    1. Controller在Zookeeper的/brokers/ids節點上注冊Watch。一旦有Broker宕機(本文用宕機代表任何讓Kafka認為其Broker die的情景,包括但不限于機器斷電,網絡不可用,GC導致的Stop The World,進程crash等),其在Zookeeper對應的Znode會自動被刪除,Zookeeper會fire Controller注冊的Watch,Controller即可獲取最新的幸存的Broker列表。
    2. Controller決定set_p,該集合包含了宕機的所有Broker上的所有Partition。
    3. 對set_p中的每一個Partition:
        3.1 從/brokers/topics/[topic]/partitions/[partition]/state讀取該Partition當前的ISR。
        3.2 決定該Partition的新Leader。如果當前ISR中有至少一個Replica還幸存,則選擇其中一個作為新Leader,新的ISR則包含當前ISR中所有幸存的Replica。否則選擇該Partition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數據丟失)。如果該Partition的所有Replica都宕機了,則將新的Leader設置為-1。
        3.3 將新的Leader,ISR和新的leader_epochcontroller_epoch寫入/brokers/topics/[topic]/partitions/[partition]/state。注意,該操作只有Controller版本在3.1至3.3的過程中無變化時才會執行,否則跳轉到3.1。
    4. 直接通過RPC向set_p相關的Broker發送LeaderAndISRRequest命令。Controller可以在一個RPC操作中發送多個命令從而提高效率。
        Broker failover順序圖如下所示。
      broker failover sequence diagram

      LeaderAndIsrRequest結構如下
    LeaderAndIsrRequest

      LeaderAndIsrResponse結構如下
    LeaderAndIsrResponse

    創建/刪除Topic

    1. Controller在Zookeeper的/brokers/topics節點上注冊Watch,一旦某個Topic被創建或刪除,則Controller會通過Watch得到新創建/刪除的Topic的Partition/Replica分配。
    2. 對于刪除Topic操作,Topic工具會將該Topic名字存于/admin/delete_topics。若delete.topic.enable為true,則Controller注冊在/admin/delete_topics上的Watch被fire,Controller通過回調向對應的Broker發送StopReplicaRequest;若為false則Controller不會在/admin/delete_topics上注冊Watch,也就不會對該事件作出反應,此時Topic操作只被記錄而不會被執行。
    3. 對于創建Topic操作,Controller從/brokers/ids讀取當前所有可用的Broker列表,對于set_p中的每一個Partition:
        3.1 從分配給該Partition的所有Replica(稱為AR)中任選一個可用的Broker作為新的Leader,并將AR設置為新的ISR(因為該Topic是新創建的,所以AR中所有的Replica都沒有數據,可認為它們都是同步的,也即都在ISR中,任意一個Replica都可作為Leader)
        3.2 將新的Leader和ISR寫入/brokers/topics/[topic]/partitions/[partition]
    4. 直接通過RPC向相關的Broker發送LeaderAndISRRequest。
        創建Topic順序圖如下所示。
      create topic sequence diagram

    Broker響應請求流程

      Broker通過kafka.network.SocketServer及相關模塊接受各種請求并作出響應。整個網絡通信模塊基于Java NIO開發,并采用Reactor模式,其中包含1個Acceptor負責接受客戶請求,N個Processor負責讀寫數據,M個Handler處理業務邏輯。
      Acceptor的主要職責是監聽并接受客戶端(請求發起方,包括但不限于Producer,Consumer,Controller,Admin Tool)的連接請求,并建立和客戶端的數據傳輸通道,然后為該客戶端指定一個Processor,至此它對該客戶端該次請求的任務就結束了,它可以去響應下一個客戶端的連接請求了。其核心代碼如下。
    Kafka SocketServer Acceptor_run
      
      Processor主要負責從客戶端讀取數據并將響應返回給客戶端,它本身并不處理具體的業務邏輯,并且其內部維護了一個隊列來保存分配給它的所有SocketChannel。Processor的run方法會循環從隊列中取出新的SocketChannel并將其SelectionKey.OP_READ注冊到selector上,然后循環處理已就緒的讀(請求)和寫(響應)。Processor讀取完數據后,將其封裝成Request對象并將其交給RequestChannel。
      RequestChannel是Processor和KafkaRequestHandler交換數據的地方,它包含一個隊列requestQueue用來存放Processor加入的Request,KafkaRequestHandler會從里面取出Request來處理;同時它還包含一個respondQueue,用來存放KafkaRequestHandler處理完Request后返還給客戶端的Response。
      Processor會通過processNewResponses方法依次將requestChannel中responseQueue保存的Response取出,并將對應的SelectionKey.OP_WRITE事件注冊到selector上。當selector的select方法返回時,對檢測到的可寫通道,調用write方法將Response返回給客戶端。
      KafkaRequestHandler循環從RequestChannel中取Request并交給kafka.server.KafkaApis處理具體的業務邏輯。

    LeaderAndIsrRequest響應過程

      對于收到的LeaderAndIsrRequest,Broker主要通過ReplicaManager的becomeLeaderOrFollower處理,流程如下:

    1. 若請求中controllerEpoch小于當前最新的controllerEpoch,則直接返回ErrorMapping.StaleControllerEpochCode。
    2. 對于請求中partitionStateInfos中的每一個元素,即((topic, partitionId), partitionStateInfo):
        2.1 若partitionStateInfo中的leader epoch大于當前ReplicManager中存儲的(topic, partitionId)對應的partition的leader epoch,則:
          2.1.1 若當前brokerid(或者說replica id)在partitionStateInfo中,則將該partition及partitionStateInfo存入一個名為partitionState的HashMap中
          2.1.2否則說明該Broker不在該Partition分配的Replica list中,將該信息記錄于log中
        2.2否則將相應的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中
    3. 篩選出partitionState中Leader與當前Broker ID相等的所有記錄存入partitionsTobeLeader中,其它記錄存入partitionsToBeFollower中。
    4. 若partitionsTobeLeader不為空,則對其執行makeLeaders方。
    5. 若partitionsToBeFollower不為空,則對其執行makeFollowers方法。
    6. 若highwatermak線程還未啟動,則將其啟動,并將hwThreadInitialized設為true。
    7. 關閉所有Idle狀態的Fetcher。

      LeaderAndIsrRequest處理過程如下圖所示
    LeaderAndIsrRequest Flow Chart

    Broker啟動過程

      Broker啟動后首先根據其ID在Zookeeper的/brokers/idszonde下創建臨時子節點(Ephemeral node),創建成功后Controller的ReplicaStateMachine注冊其上的Broker Change Watch會被fire,從而通過回調KafkaController.onBrokerStartup方法完成以下步驟:

    1. 向所有新啟動的Broker發送UpdateMetadataRequest,其定義如下。
      UpdateMetadataRequest
    2. 將新啟動的Broker上的所有Replica設置為OnlineReplica狀態,同時這些Broker會為這些Partition啟動high watermark線程。
    3. 通過partitionStateMachine觸發OnlinePartitionStateChange。

    Controller Failover

    Controller也需要Failover。每個Broker都會在Controller Path (/controller)上注冊一個Watch。當前Controller失敗時,對應的Controller Path會自動消失(因為它是Ephemeral Node),此時該Watch被fire,所有“活”著的Broker都會去競選成為新的Controller(創建新的Controller Path),但是只會有一個競選成功(這點由Zookeeper保證)。競選成功者即為新的Leader,競選失敗者則重新在新的Controller Path上注冊Watch。因為Zookeeper的Watch是一次性的,被fire一次之后即失效,所以需要重新注冊。

    Broker成功競選為新Controller后會觸發KafkaController.onControllerFailover方法,并在該方法中完成如下操作:

    1. 讀取并增加Controller Epoch。
    2. 在ReassignedPartitions Path(/admin/reassign_partitions)上注冊Watch。
    3. 在PreferredReplicaElection Path(/admin/preferred_replica_election)上注冊Watch。
    4. 通過partitionStateMachine在Broker Topics Patch(/brokers/topics)上注冊Watch。
    5. delete.topic.enable設置為true(默認值是false),則partitionStateMachine在Delete Topic Patch(/admin/delete_topics)上注冊Watch。
    6. 通過replicaStateMachine在Broker Ids Patch(/brokers/ids)上注冊Watch。
    7. 初始化ControllerContext對象,設置當前所有Topic,“活”著的Broker列表,所有Partition的Leader及ISR等。
    8. 啟動replicaStateMachine和partitionStateMachine。
    9. 將brokerState狀態設置為RunningAsController。
    10. 將每個Partition的Leadership信息發送給所有“活”著的Broker。
    11. auto.leader.rebalance.enable配置為true(默認值是true),則啟動partition-rebalance線程。
    12. delete.topic.enable設置為true且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應的Topic。

    Partition重新分配

      管理工具發出重新分配Partition請求后,會將相應信息寫到/admin/reassign_partitions上,而該操作會觸發ReassignedPartitionsIsrChangeListener,從而通過執行回調函數KafkaController.onPartitionReassignment來完成以下操作:

    1. 將Zookeeper中的AR(Current Assigned Replicas)更新為OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
    2. 強制更新Zookeeper中的leader epoch,向AR中的每個Replica發送LeaderAndIsrRequest。
    3. 將RAR - OAR中的Replica設置為NewReplica狀態。
    4. 等待直到RAR中所有的Replica都與其Leader同步。
    5. 將RAR中所有的Replica都設置為OnlineReplica狀態。
    6. 將Cache中的AR設置為RAR。
    7. 若Leader不在RAR中,則從RAR中重新選舉出一個新的Leader并發送LeaderAndIsrRequest。若新的Leader不是從RAR中選舉而出,則還要增加Zookeeper中的leader epoch。
    8. 將OAR - RAR中的所有Replica設置為OfflineReplica狀態,該過程包含兩部分。第一,將Zookeeper上ISR中的OAR - RAR移除并向Leader發送LeaderAndIsrRequest從而通知這些Replica已經從ISR中移除;第二,向OAR - RAR中的Replica發送StopReplicaRequest從而停止不再分配給該Partition的Replica。
    9. 將OAR - RAR中的所有Replica設置為NonExistentReplica狀態從而將其從磁盤上刪除。
    10. 將Zookeeper中的AR設置為RAR。
    11. 刪除/admin/reassign_partition
        
      注意:最后一步才將Zookeeper中的AR更新,因為這是唯一一個持久存儲AR的地方,如果Controller在這一步之前crash,新的Controller仍然能夠繼續完成該過程。
        以下是Partition重新分配的案例,OAR = {1,2,3},RAR = {4,5,6},Partition重新分配過程中Zookeeper中的AR和Leader/ISR路徑如下
    AR leader/isr Step
    {1,2,3} 1/{1,2,3} (initial state)
    {1,2,3,4,5,6} 1/{1,2,3} (step 2)
    {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4)
    {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7)
    {1,2,3,4,5,6} 4/{4,5,6} (step 8)
    {4,5,6} 4/{4,5,6} (step 10)

    Follower從Leader Fetch數據

      Follower通過向Leader發送FetchRequest獲取消息,FetchRequest結構如下
    FetchRequest
      從FetchRequest的結構可以看出,每個Fetch請求都要指定最大等待時間和最小獲取字節數,以及由TopicAndPartition和PartitionFetchInfo構成的Map。實際上,Follower從Leader數據和Consumer從Broker Fetch數據,都是通過FetchRequest請求完成,所以在FetchRequest結構中,其中一個字段是clientID,并且其默認值是ConsumerConfig.DefaultClientId。
      
      Leader收到Fetch請求后,Kafka通過KafkaApis.handleFetchRequest響應該請求,響應過程如下:

    1. replicaManager根據請求讀出數據存入dataRead中。
    2. 如果該請求來自Follower則更新其相應的LEO(log end offset)以及相應Partition的High Watermark
    3. 根據dataRead算出可讀消息長度(單位為字節)并存入bytesReadable中。
    4. 滿足下面4個條件中的1個,則立即將相應的數據返回
    • Fetch請求不希望等待,即fetchRequest.macWait <= 0
    • Fetch請求不要求一定能取到消息,即fetchRequest.numPartitions <= 0,也即requestInfo為空
    • 有足夠的數據可供返回,即bytesReadable >= fetchRequest.minBytes
    • 讀取數據時發生異常
    1. 若不滿足以上4個條件,FetchRequest將不會立即返回,并將該請求封裝成DelayedFetch。檢查該DeplayedFetch是否滿足,若滿足則返回請求,否則將該請求加入Watch列表

      Leader通過以FetchResponse的形式將消息返回給Follower,FetchResponse結構如下
    FetchResponse

    #Replication工具

    Topic Tool

      $KAFKA_HOME/bin/kafka-topics.sh,該工具可用于創建、刪除、修改、查看某個Topic,也可用于列出所有Topic。另外,該工具還可修改以下配置。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    unclean.leader.election.enable
    delete.retention.ms
    segment.jitter.ms
    retention.ms
    flush.ms
    segment.bytes
    flush.messages
    segment.ms
    retention.bytes
    cleanup.policy
    segment.index.bytes
    min.cleanable.dirty.ratio
    max.message.bytes
    file.delete.delay.ms
    min.insync.replicas
    index.interval.bytes

    Replica Verification Tool

      $KAFKA_HOME/bin/kafka-replica-verification.sh,該工具用來驗證所指定的一個或多個Topic下每個Partition對應的所有Replica是否都同步。可通過topic-white-list這一參數指定所需要驗證的所有Topic,支持正則表達式。   

    Preferred Replica Leader Election Tool

    用途
      有了Replication機制后,每個Partition可能有多個備份。某個Partition的Replica列表叫作AR(Assigned Replicas),AR中的第一個Replica即為“Preferred Replica”。創建一個新的Topic或者給已有Topic增加Partition時,Kafka保證Preferred Replica被均勻分布到集群中的所有Broker上。理想情況下,Preferred Replica會被選為Leader。以上兩點保證了所有Partition的Leader被均勻分布到了集群當中,這一點非常重要,因為所有的讀寫操作都由Leader完成,若Leader分布過于集中,會造成集群負載不均衡。但是,隨著集群的運行,該平衡可能會因為Broker的宕機而被打破,該工具就是用來幫助恢復Leader分配的平衡。
      事實上,每個Topic從失敗中恢復過來后,它默認會被設置為Follower角色,除非某個Partition的Replica全部宕機,而當前Broker是該Partition的AR中第一個恢復回來的Replica。因此,某個Partition的Leader(Preferred Replica)宕機并恢復后,它很可能不再是該Partition的Leader,但仍然是Preferred Replica。
      
    原理

    1. 在Zookeeper上創建/admin/preferred_replica_election節點,并存入需要調整Preferred Replica的Partition信息。
    2. Controller一直Watch該節點,一旦該節點被創建,Controller會收到通知,并獲取該內容。
    3. Controller讀取Preferred Replica,如果發現該Replica當前并非是Leader并且它在該Partition的ISR中,Controller向該Replica發送LeaderAndIsrRequest,使該Replica成為Leader。如果該Replica當前并非是Leader,且不在ISR中,Controller為了保證沒有數據丟失,并不會將其設置為Leader。  

    用法
      $KAFKA_HOME/bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181

      在包含8個Broker的Kafka集群上,創建1個名為topic1,replication-factor為3,Partition數為8的Topic,使用$KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic1 --zookeeper localhost:2181命令查看其Partition/Replica分布。

      查詢結果如下圖所示,從圖中可以看到,Kafka將所有Replica均勻分布到了整個集群,并且Leader也均勻分布。
    preferred_topic_test_1

      手動停止部分Broker,topic1的Partition/Replica分布如下圖所示。從圖中可以看到,由于Broker 1/2/4都被停止,Partition 0的Leader由原來的1變為3,Partition 1的Leader由原來的2變為5,Partition 2的Leader由原來的3變為6,Partition 3的Leader由原來的4變為7。
    preferred_topic_test_2  
      
      再重新啟動ID為1的Broker,topic1的Partition/Replica分布如下。可以看到,雖然Broker 1已經啟動(Partition 0和Partition5的ISR中有1),但是1并不是任何一個Parititon的Leader,而Broker 5/6/7都是2個Partition的Leader,即Leader的分布不均衡——一個Broker最多是2個Partition的Leader,而最少是0個Partition的Leader。
    preferred_topic_test_3
      
      運行該工具后,topic1的Partition/Replica分布如下圖所示。由圖可見,除了Partition 1和Partition 3由于Broker 2和Broker 4還未啟動,所以其Leader不是其Preferred Repliac外,其它所有Partition的Leader都是其Preferred Replica。同時,與運行該工具前相比,Leader的分配更均勻——一個Broker最多是2個Parittion的Leader,最少是1個Partition的Leader。
    preferred_topic_test_4
      
      啟動Broker 2和Broker 4,Leader分布與上一步相比并未變化,如下圖所示。
    preferred_topic_test_5

      再次運行該工具,所有Partition的Leader都由其Preferred Replica承擔,Leader分布更均勻——每個Broker承擔1個Partition的Leader角色。
      
      除了手動運行該工具使Leader分配均勻外,Kafka還提供了自動平衡Leader分配的功能,該功能可通過將auto.leader.rebalance.enable設置為true開啟,它將周期性檢查Leader分配是否平衡,若不平衡度超過一定閾值則自動由Controller嘗試將各Partition的Leader設置為其Preferred Replica。檢查周期由leader.imbalance.check.interval.seconds指定,不平衡度閾值由leader.imbalance.per.broker.percentage指定。   

    Kafka Reassign Partitions Tool

    用途
      該工具的設計目標與Preferred Replica Leader Election Tool有些類似,都旨在促進Kafka集群的負載均衡。不同的是,Preferred Replica Leader Election只能在Partition的AR范圍內調整其Leader,使Leader分布均勻,而該工具還可以調整Partition的AR。
      Follower需要從Leader Fetch數據以保持與Leader同步,所以僅僅保持Leader分布的平衡對整個集群的負載均衡來說是不夠的。另外,生產環境下,隨著負載的增大,可能需要給Kafka集群擴容。向Kafka集群中增加Broker非常簡單方便,但是對于已有的Topic,并不會自動將其Partition遷移到新加入的Broker上,此時可用該工具達到此目的。某些場景下,實際負載可能遠小于最初預期負載,此時可用該工具將分布在整個集群上的Partition重裝分配到某些機器上,然后可以停止不需要的Broker從而實現節約資源的目的。
      需要說明的是,該工具不僅可以調整Partition的AR位置,還可調整其AR數量,即改變該Topic的replication factor。
      
    原理
      該工具只負責將所需信息存入Zookeeper中相應節點,然后退出,不負責相關的具體操作,所有調整都由Controller完成。

    1. 在Zookeeper上創建/admin/reassign_partitions節點,并存入目標Partition列表及其對應的目標AR列表。
    2. Controller注冊在/admin/reassign_partitions上的Watch被fire,Controller獲取該列表。
    3. 對列表中的所有Partition,Controller會做如下操作:
    • 啟動RAR - AR中的Replica,即新分配的Replica。(RAR = Reassigned Replicas, AR = Assigned Replicas)
    • 等待新的Replica與Leader同步
    • 如果Leader不在RAR中,從RAR中選出新的Leader
    • 停止并刪除AR - RAR中的Replica,即不再需要的Replica
    • 刪除/admin/reassign_partitions節點

    用法
      該工具有三種使用模式

    • generate模式,給定需要重新分配的Topic,自動生成reassign plan(并不執行)
    • execute模式,根據指定的reassign plan重新分配Partition
    • verify模式,驗證重新分配Partition是否成功

      下面這個例子將使用該工具將Topic的所有Partition重新分配到Broker 4/5/6/7上,步驟如下:

    1. 使用generate模式,生成reassign plan。指定需要重新分配的Topic ({“topics”:[{“topic”:”topic1”}],”version”:1}),并存入/tmp/topics-to-move.json文件中,然后執行
      1
      2
      3
      4
      $KAFKA_HOME/bin/kafka-reassign-partitions.sh 
      --zookeeper localhost:2181
      --topics-to-move-json-file /tmp/topics-to-move.json
      --broker-list "4,5,6,7" --generate

      結果如下圖所示
    reassign_1
      
    2. 使用execute模式,執行reassign plan
      將上一步生成的reassignment plan存入/tmp/reassign-plan.json文件中,并執行

    1
    2
    3
       $KAFKA_HOME/bin/kafka-reassign-partitions.sh 
    --zookeeper localhost:2181
    --reassignment-json-file /tmp/reassign-plan.json --execute

    reassign_2

      此時,Zookeeper上/admin/reassign_partitions節點被創建,且其值與/tmp/reassign-plan.json文件的內容一致。
    reassign_3

    3. 使用verify模式,驗證reassign是否完成。執行verify命令

    1
    2
    3
    $KAFKA_HOME/bin/kafka-reassign-partitions.sh 
    --zookeeper localhost:2181 --verify
    --reassignment-json-file /tmp/reassign-plan.json

      結果如下所示,從圖中可看出topic1的所有Partititon都重新分配成功。
    reassign_4

      接下來用Topic Tool再次驗證。

    1
    bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic1

      結果如下圖所示,從圖中可看出topic1的所有Partition都被重新分配到Broker 4/5/6/7,且每個Partition的AR與reassign plan一致。
    reassign_5

      需要說明的是,在使用execute之前,并不一定要使用generate模式自動生成reassign plan,使用generate模式只是為了方便。事實上,某些場景下,generate模式生成的reassign plan并不一定能滿足需求,此時用戶可以自己設置reassign plan。   

    State Change Log Merge Tool

    用途
      該工具旨在從整個集群的Broker上收集狀態改變日志,并生成一個集中的格式化的日志以幫助診斷狀態改變相關的故障。每個Broker都會將其收到的狀態改變相關的的指令存于名為state-change.log的日志文件中。某些情況下,Partition的Leader Election可能會出現問題,此時我們需要對整個集群的狀態改變有個全局的了解從而診斷故障并解決問題。該工具將集群中相關的state-change.log日志按時間順序合并,同時支持用戶輸入時間范圍和目標Topic及Partition作為過濾條件,最終將格式化的結果輸出。
      
    用法

    1
    2
    3
    bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger
    --logs /opt/kafka_2.11-0.8.2.1/logs/state-change.log
    --topic topic1 --partitions 0,1,2,3,4,5,6,7

    Kafka系列文章

    郭俊 Jason wechat
    歡迎關注作者微信公眾號【大數據架構】
    您的贊賞將支持作者繼續原創分享
    速赢彩app 通辽 | 三亚 | 聊城 | 连云港 | 西双版纳 | 桓台 | 阿勒泰 | 桐乡 | 汉中 | 平潭 | 北海 | 涿州 | 朝阳 | 毕节 | 安阳 | 遵义 | 三门峡 | 赣州 | 赤峰 | 德州 | 江门 | 舟山 | 漯河 | 日喀则 | 汕头 | 衢州 | 吐鲁番 | 济宁 | 无锡 | 邳州 | 雄安新区 | 来宾 | 灌云 | 南安 | 岳阳 | 孝感 | 湘西 | 通辽 | 咸宁 | 图木舒克 | 江苏苏州 | 日喀则 | 黄山 | 四平 | 林芝 | 江西南昌 | 榆林 | 安岳 | 南通 | 宜都 | 铜陵 | 海南海口 | 石河子 | 东海 | 张掖 | 茂名 | 揭阳 | 临海 | 新沂 | 绥化 | 昆山 | 临沧 | 台南 | 周口 | 金昌 | 唐山 | 湛江 | 莱州 | 广饶 | 常州 | 广西南宁 | 衢州 | 三河 | 通辽 | 儋州 | 咸宁 | 余姚 | 肇庆 | 海北 | 白城 | 淄博 | 钦州 | 淄博 | 池州 | 盘锦 | 玉林 | 清徐 | 宁波 | 烟台 | 嘉善 | 吉安 | 固原 | 明港 | 烟台 | 五指山 | 屯昌 | 溧阳 | 呼伦贝尔 | 东方 | 锦州 | 贵州贵阳 | 新疆乌鲁木齐 | 如皋 | 五指山 | 普洱 | 百色 | 长垣 | 顺德 | 果洛 | 五指山 | 宁波 | 菏泽 | 巴音郭楞 |