• <menu id="gyiem"><menu id="gyiem"></menu></menu>
  • <menu id="gyiem"><code id="gyiem"></code></menu>

    Kafka設計解析(一)- Kafka背景及架構介紹

    原創文章,轉載請務必將下面這段話置于文章開頭處。(已授權InfoQ中文站發布
    本文轉發自技術世界原文鏈接 http://www.luozeyang.com/2015/03/10/KafkaColumn1

    摘要

      Kafka是由LinkedIn開發并開源的分布式消息系統,因其分布式及高吞吐率而被廣泛使用,現已與Cloudera Hadoop,Apache Storm,Apache Spark集成。本文介紹了Kafka的創建背景,設計目標,使用消息系統的優勢以及目前流行的消息系統對比。并介紹了Kafka的架構,Producer消息路由,Consumer Group以及由其實現的不同消息分發方式,Topic & Partition,最后介紹了Kafka Consumer為何使用pull模式以及Kafka提供的三種delivery guarantee。

    背景介紹

    Kafka創建背景

      Kafka是一個消息系統,原本開發自LinkedIn,用作LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。現在它已被多家不同類型的公司 作為多種類型的數據管道和消息系統使用。
      活動流數據是幾乎所有站點在對其網站使用情況做報表時都要用到的數據中最常規的部分。活動數據包括頁面訪問量(Page View)、被查看內容方面的信息以及搜索情況等內容。這種數據通常的處理方式是先把各種活動以日志的形式寫入某種文件,然后周期性地對這些文件進行統計分析。運營數據指的是服務器的性能數據(CPU、IO使用率、請求時間、服務日志等等數據)。運營數據的統計方法種類繁多。
      近年來,活動和運營數據處理已經成為了網站軟件產品特性中一個至關重要的組成部分,這就需要一套稍微更加復雜的基礎設施對其提供支持。   

    Kafka簡介

      Kafka是一種分布式的,基于發布/訂閱的消息系統。主要設計目標如下:

    • 以時間復雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間復雜度的訪問性能
    • 高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條以上消息的傳輸
    • 支持Kafka Server間的消息分區,及分布式消費,同時保證每個Partition內的消息順序傳輸
    • 同時支持離線數據處理和實時數據處理
    • Scale out:支持在線水平擴展

    為何使用消息系統

    • 解耦
        在項目啟動之初來預測將來項目會碰到什么需求,是極其困難的。消息系統在處理過程中間插入了一個隱含的、基于數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

    • 冗余
        有些情況下,處理數據的過程會失敗。除非數據被持久化,否則將造成丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的”插入-獲取-刪除”范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。

    • 擴展性
        因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴展就像調大電力按鈕一樣簡單。

    • 靈活性 & 峰值處理能力
        在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見;如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

    • 可恢復性
        系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。

    • 順序保證
        在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數據會按照特定的順序來處理。Kafka保證一個Partition內的消息的有序性。

    • 緩沖
        在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列通過一個緩沖層來幫助任務最高效率的執行———寫入隊列的處理會盡可能的快速。該緩沖有助于控制和優化數據流經過系統的速度。

    • 異步通信
        很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

    常用Message Queue對比

    • RabbitMQ
        RabbitMQ是使用Erlang編寫的一個開源的消息隊列,本身支持很多的協議:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量級,更適合于企業級的開發。同時實現了Broker構架,這意味著消息在發送給客戶端時先在中心隊列排隊。對路由,負載均衡或者數據持久化都有很好的支持。

    • Redis
        Redis是一個基于Key-Value對的NoSQL數據庫,開發維護很活躍。雖然它是一個Key-Value數據庫存儲系統,但它本身支持MQ功能,所以完全可以當做一個輕量級的隊列服務來使用。對于RabbitMQ和Redis的入隊和出隊操作,各執行100萬次,每10萬次記錄一次執行時間。測試數據分為128Bytes、512Bytes、1K和10K四個不同大小的數據。實驗表明:入隊時,當數據比較小時Redis的性能要高于RabbitMQ,而如果數據大小超過了10K,Redis則慢的無法忍受;出隊時,無論數據大小,Redis都表現出非常好的性能,而RabbitMQ的出隊性能則遠低于Redis。

    • ZeroMQ
        ZeroMQ號稱最快的消息隊列系統,尤其針對大吞吐量的需求場景。ZMQ能夠實現RabbitMQ不擅長的高級/復雜的隊列,但是開發人員需要自己組合多種技術框架,技術上的復雜度是對這MQ能夠應用成功的挑戰。ZeroMQ具有一個獨特的非中間件的模式,你不需要安裝和運行一個消息服務器或中間件,因為你的應用程序將扮演這個服務器角色。你只需要簡單的引用ZeroMQ程序庫,可以使用NuGet安裝,然后你就可以愉快的在應用程序之間發送消息了。但是ZeroMQ僅提供非持久性的隊列,也就是說如果宕機,數據將會丟失。其中,Twitter的Storm 0.9.0以前的版本中默認使用ZeroMQ作為數據流的傳輸(Storm從0.9版本開始同時支持ZeroMQ和Netty作為傳輸模塊)。

    • ActiveMQ
        ActiveMQ是Apache下的一個子項目。 類似于ZeroMQ,它能夠以代理人和點對點的技術實現隊列。同時類似于RabbitMQ,它少量代碼就可以高效地實現高級應用場景。

    • Kafka/Jafka
        Kafka是Apache下的一個子項目,是一個高性能跨語言分布式發布/訂閱消息隊列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具有以下特性:快速持久化,可以在O(1)的系統開銷下進行消息持久化;高吞吐,在一臺普通的服務器上既可以達到10W/s的吞吐速率;完全的分布式系統,Broker、Producer、Consumer都原生自動支持分布式,自動實現負載均衡;支持Hadoop數據并行加載,對于像Hadoop的一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka通過Hadoop的并行加載機制統一了在線和離線的消息處理。Apache Kafka相對于ActiveMQ是一個非常輕量級的消息系統,除了性能非常好之外,還是一個工作良好的分布式系統。

    Kafka架構

    Terminology

    • Broker
        Kafka集群包含一個或多個服務器,這種服務器被稱為broker
    • Topic
        每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存于何處)
    • Partition
        Parition是物理上的概念,每個Topic包含一個或多個Partition.
    • Producer
        負責發布消息到Kafka broker
    • Consumer
        消息消費者,向Kafka broker讀取消息的客戶端。
    • Consumer Group
        每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬于默認的group)。

    Kafka拓撲結構

    kafka architecture 架構
      如上圖所示,一個典型的Kafka集群中包含若干Producer(可以是web前端產生的Page View,或者是服務器日志,系統CPU、Memory等),若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干Consumer Group,以及一個Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。Producer使用push模式將消息發布到broker,Consumer使用pull模式從broker訂閱并消費消息。   

    Topic & Partition

      Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條消息放進哪個queue里。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。若創建topic1和topic2兩個topic,且分別有13個和19個分區,則整個集群上會相應會生成共32個文件夾(本文所用集群共8個節點,此處topic1和topic2 replication-factor均為1),如下圖所示。
      kafka topic partition
      
      每個日志文件都是一個log entry序列,每個log entry包含一個4字節整型數值(值為N+5),1個字節的”magic value”,4個字節的CRC校驗碼,其后跟N個字節的消息體。每條消息都有一個當前Partition下唯一的64字節的offset,它指明了這條消息的起始位置。磁盤上存儲的消息格式如下:
      message length : 4 bytes (value: 1+4+n)
      “magic” value : 1 byte
      crc : 4 bytes
      payload : n bytes
      這個log entry并非由一個文件構成,而是分成多個segment,每個segment以該segment第一條消息的offset命名并以“.kafka”為后綴。另外會有一個索引文件,它標明了每個segment下包含的log entry的offset范圍,如下圖所示。
      kafka partition event index
      
      因為每條消息都被append到該Partition中,屬于順序寫磁盤,因此效率非常高(經驗證,順序寫磁盤效率比隨機寫內存還要高,這是Kafka高吞吐率的一個很重要的保證)。
      kafka 順序寫磁盤
      
      對于傳統的message queue而言,一般會刪除已經被消費的消息,而Kafka集群會保留所有的消息,無論其被消費與否。當然,因為磁盤限制,不可能永久保留所有數據(實際上也沒必要),因此Kafka提供兩種策略刪除舊數據。一是基于時間,二是基于Partition文件大小。例如可以通過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一周前的數據,也可在Partition文件超過1GB時刪除舊數據,配置如下所示。

    1
    2
    3
    4
    5
    6
    7
    8
    # The minimum age of a log file to be eligible for deletion
    log.retention.hours=168
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    # The interval at which log segments are checked to see if they can be deleted according to the retention policies
    log.retention.check.interval.ms=300000
    # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
    log.cleaner.enable=false

      這里要注意,因為Kafka讀取特定消息的時間復雜度為O(1),即與文件大小無關,所以這里刪除過期文件與提高Kafka性能無關。選擇怎樣的刪除策略只與磁盤以及具體的需求有關。另外,Kafka會為每一個Consumer Group保留一些metadata信息——當前消費的消息的position,也即offset。這個offset由Consumer控制。正常情況下Consumer會在消費完一條消息后遞增該offset。當然,Consumer也可將offset設成一個較小的值,重新消費一些消息。因為offet由Consumer控制,所以Kafka broker是無狀態的,它不需要標記哪些消息被哪些消費過,也不需要通過broker去保證同一個Consumer Group只有一個Consumer能消費某一條消息,因此也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。   

    Producer消息路由

      Producer發送消息到broker時,會根據Paritition機制選擇將其存儲到哪一個Partition。如果Partition機制設置合理,所有消息可以均勻分布到不同的Partition里,這樣就實現了負載均衡。如果一個Topic對應一個文件,那這個文件所在的機器I/O將會成為這個Topic的性能瓶頸,而有了Partition后,不同的消息可以并行寫入不同broker的不同Partition里,極大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通過配置項num.partitions來指定新建Topic的默認Partition數量,也可在創建Topic時通過參數指定,同時也可以在Topic創建之后通過Kafka提供的工具修改。
      
      在發送一條消息時,可以指定這條消息的key,Producer根據這個key和Partition機制來判斷應該將這條消息發送到哪個Parition。Paritition機制可以通過指定Producer的paritition. class這一參數來指定,該class必須實現kafka.producer.Partitioner接口。本例中如果key可以被解析為整數則將對應的整數與Partition總數取余,該消息會被發送到該數對應的Partition。(每個Parition都會有個序號,序號從0開始)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    import kafka.producer.Partitioner;
    import kafka.utils.VerifiableProperties;

    public class JasonPartitioner<T> implements Partitioner {

    public JasonPartitioner(VerifiableProperties verifiableProperties) {}

    @Override
    public int partition(Object key, int numPartitions) {
    try {
    int partitionNum = Integer.parseInt((String) key);
    return Math.abs(Integer.parseInt((String) key) % numPartitions);
    } catch (Exception e) {
    return Math.abs(key.hashCode() % numPartitions);
    }
    }
    }

      如果將上例中的類作為partition.class,并通過如下代碼發送20條消息(key分別為0,1,2,3)至topic3(包含4個Partition)。
      

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public void sendMessage() throws InterruptedException{
      for(int i = 1; i <= 5; i++){
       List messageList = new ArrayList<KeyedMessage<String, String>>();
       for(int j = 0; j < 4; j++){
       messageList.add(new KeyedMessage<String, String>("topic2", String.valueOf(j), String.format("The %d message for key %d", i, j));
       }
       producer.send(messageList);
    }
      producer.close();
    }

      則key相同的消息會被發送并存儲到同一個partition里,而且key的序號正好和Partition序號相同。(Partition序號從0開始,本例中的key也從0開始)。下圖所示是通過Java程序調用Consumer后打印出的消息列表。
      kafka consumer rebalance   

    Consumer Group

      (本節所有描述都是基于Consumer hight level API而非low level API)。
      使用Consumer high level API時,同一Topic的一條消息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一消息。
      kafka consumer group
      這是Kafka用來實現一個Topic消息的廣播(發給所有的Consumer)和單播(發給某一個Consumer)的手段。一個Topic可以對應多個Consumer Group。如果需要實現廣播,只要每個Consumer有一個獨立的Group就可以了。要實現單播只要所有的Consumer在同一個Group里。用Consumer Group還可以將Consumer進行自由的分組而不需要多次發送消息到不同的Topic。
      實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用Storm這種實時流處理系統對消息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還可以同時將數據實時備份到另一個數據中心,只需要保證這三個操作所使用的Consumer屬于不同的Consumer Group即可。下圖是Kafka在Linkedin的一種簡化部署示意圖。
      kafka sample deployment in linkedin
      
      下面這個例子更清晰地展示了Kafka Consumer Group的特性。首先創建一個Topic (名為topic1,包含3個Partition),然后創建一個屬于group1的Consumer實例,并創建三個屬于group2的Consumer實例,最后通過Producer向topic1發送key分別為1,2,3的消息。結果發現屬于group1的Consumer收到了所有的這三條消息,同時group2中的3個Consumer分別收到了key為1,2,3的消息。如下圖所示。
      kafka consumer group   

    Push vs. Pull  

      作為一個消息系統,Kafka遵循了傳統的方式,選擇由Producer向broker push消息并由Consumer從broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事實上,push模式和pull模式各有優劣。
      push模式很難適應消費速率不同的消費者,因為消息發送速率是由broker決定的。push模式的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成Consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據Consumer的消費能力以適當的速率消費消息。
      對于Kafka而言,pull模式更合適。pull模式可簡化broker的設計,Consumer可自主控制消費消息的速率,同時Consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。   

    Kafka delivery guarantee

      有這么幾種可能的delivery guarantee:

    • At most once 消息可能會丟,但絕不會重復傳輸
    • At least one 消息絕不會丟,但可能會重復傳輸
    • Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次,很多時候這是用戶所想要的。
        
        當Producer向broker發送消息時,一旦這條消息被commit,因數replication的存在,它就不會丟。但是如果Producer發送數據給broker后,遇到網絡問題而造成通信中斷,那Producer就無法判斷該條消息是否已經commit。雖然Kafka無法確定網絡故障期間發生了什么,但是Producer可以生成一種類似于主鍵的東西,發生故障時冪等性的重試多次,這樣就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),這一Feature還并未實現,有希望在Kafka未來的版本中實現。(所以目前默認情況下一條消息從Producer到broker是確保了At least once,可通過設置Producer異步發送實現At most once)。
        接下來討論的是消息從broker到Consumer的delivery guarantee語義。(僅針對Kafka consumer high level API)。Consumer在從broker讀取消息后,可以選擇commit,該操作會在Zookeeper中保存該Consumer在該Partition中讀取的消息的offset。該Consumer下一次再讀該Partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit之后的開始位置相同。當然可以將Consumer設置為autocommit,即Consumer一旦讀到數據立即自動commit。如果只討論這一讀取消息的過程,那Kafka是確保了Exactly once。但實際使用中應用程序并非在Consumer讀取完數據就結束了,而是要進行進一步處理,而數據處理與commit的順序在很大程度上決定了消息從broker和consumer的delivery guarantee semantic。
    • 讀完消息先commit再處理消息。這種模式下,如果Consumer在commit后還沒來得及處理消息就crash了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息,這就對應于At most once
    • 讀完消息先處理再commit。這種模式下,如果在處理完消息之后commit之前Consumer crash了,下次重新開始工作時還會處理剛剛未commit的消息,實際上該消息已經被處理過了。這就對應于At least once。在很多使用場景下,消息都有一個主鍵,所以消息的處理往往具有冪等性,即多次處理這一條消息跟只處理一次是等效的,那就可以認為是Exactly once。(筆者認為這種說法比較牽強,畢竟它不是Kafka本身提供的機制,主鍵本身也并不能完全保證操作的冪等性。而且實際上我們說delivery guarantee 語義是討論被處理多少次,而非處理結果怎樣,因為處理方式多種多樣,我們不應該把處理過程的特性——如是否冪等性,當成Kafka本身的Feature)
    • 如果一定要做到Exactly once,就需要協調offset和實際操作的輸出。經典的做法是引入兩階段提交。如果能讓offset和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統可能不支持兩階段提交。比如,Consumer拿到數據后可能把數據放到HDFS,如果把最新的offset和數據本身一起寫到HDFS,那就可以保證數據的輸出和offset的更新要么都完成,要么都不完成,間接實現Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,無法存于HDFS,而low level API的offset是由自己去維護的,可以將之存于HDFS中)
        總之,Kafka默認保證At least once,并且允許通過設置Producer異步提交來實現At most once。而Exactly once要求與外部存儲系統協作,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式。

      

    Kafka系列文章

    郭俊 Jason wechat
    歡迎關注作者微信公眾號【大數據架構】
    您的贊賞將支持作者繼續原創分享
    速赢彩app 吉林长春 | 甘南 | 包头 | 南通 | 玉林 | 天长 | 邳州 | 昌都 | 肇庆 | 吐鲁番 | 迪庆 | 海西 | 钦州 | 自贡 | 黔南 | 大庆 | 鹤壁 | 舟山 | 象山 | 库尔勒 | 陵水 | 黄石 | 黑河 | 韶关 | 驻马店 | 林芝 | 海门 | 如东 | 涿州 | 南阳 | 延安 | 朔州 | 新余 | 东阳 | 宣城 | 玉林 | 吉林 | 宿迁 | 和县 | 淮安 | 新乡 | 晋中 | 石狮 | 揭阳 | 宁国 | 通化 | 肇庆 | 钦州 | 迪庆 | 宜昌 | 益阳 | 燕郊 | 鄢陵 | 阿勒泰 | 博罗 | 温岭 | 吐鲁番 | 香港香港 | 潮州 | 伊犁 | 日喀则 | 七台河 | 六安 | 云南昆明 | 保定 | 黔南 | 上饶 | 燕郊 | 吉林 | 承德 | 高密 | 永康 | 孝感 | 临汾 | 东莞 | 承德 | 灌南 | 铁岭 | 日土 | 汉中 | 基隆 | 赣州 | 威海 | 昌都 | 宜都 | 黑龙江哈尔滨 | 汉川 | 香港香港 | 塔城 | 澳门澳门 | 诸城 | 扬中 | 泉州 | 曲靖 | 黑河 | 连云港 | 宿州 | 如东 | 灵宝 | 基隆 | 正定 | 陵水 | 抚州 | 嘉善 | 莆田 | 渭南 | 衢州 | 象山 | 鹤壁 | 靖江 | 滨州 | 阿坝 | 肥城 |