• <input id="qucwm"><u id="qucwm"></u></input>
  • <menu id="qucwm"></menu>
  • <input id="qucwm"><tt id="qucwm"></tt></input>
  • <input id="qucwm"><acronym id="qucwm"></acronym></input>
  • 《KAFKA官方文檔》入門指南

    1.入門指南

    1.1簡介

    Apache的Kafka?是一個分布式流平臺(a distributed streaming platform)。這到底意味著什么?

    我們認為,一個流處理平臺應該具有三個關鍵能力:

    1. 它可以讓你發布和訂閱記錄流。在這方面,它類似于一個消息隊列或企業消息系統。
    2. 它可以讓你持久化收到的記錄流,從而具有容錯能力。
    3. 它可以讓你處理收到的記錄流。

    Kafka擅長哪些方面?

    它被用于兩大類應用:

    1. 建立實時流數據管道從而能夠可靠地在系統或應用程序之間的共享數據
    2. 構建實時流應用程序,能夠變換或者對數據
    3. 進行相應的處理。

    想要了解Kafka如何具有這些能力,讓我們從下往上深入探索Kafka的能力。

    首先,明確幾個概念:

    • Kafka是運行在一個或多個服務器的集群(Cluster)上的。
    • Kafka集群分類存儲的記錄流被稱為主題(Topics)。
    • 每個消息記錄包含一個鍵,一個值和時間戳。

    Kafka有四個核心API:

    • 生產者 API?允許應用程序發布記錄流至一個或多個Kafka的話題(Topics)。
    • 消費者API允許應用程序訂閱一個或多個主題,并處理這些主題接收到的記錄流。
    • Streams API允許應用程序充當流處理器(stream processor,從一個或多個主題獲取輸入流,并生產一個輸出流至一個或多個的主題,能夠有效地變換輸入流為輸出流。
    • Connector API允許構建和運行可重用的生產者或消費者,能夠把 Kafka主題連接到現有的應用程序或數據系統。例如,一個連接到關系數據庫的連接器(connector)可能會獲取每個表的變化。

     

    Kafka的客戶端和服務器之間的通信是靠一個簡單的,高性能的,與語言無關的TCP協議完成的。這個協議有不同的版本,并保持向后兼容舊版本(向前兼容舊版本?)。Kafka不光提供了一個Java客戶端,還有許多語言版本的客戶端。

    主題和日志

    讓我們先來了解Kafka的核心抽象概念記錄流 – 主題。

    主題是一種分類或發布的一系列記錄的名義上的名字。Kafka的主題始終是支持多用戶訂閱的;?也就是說,一個主題可以有零個,一個或多個消費者訂閱寫入的數據。

    對于每一個主題,Kafka集群保持一個分區日志文件,看下圖:

    每個分區是一個有序的,不可變的消息序列,新的消息不斷追加到這個有組織的有保證的日志上。分區會給每個消息記錄分配一個順序ID號 – 偏移量, 能夠唯一地標識該分區中的每個記錄。

    Kafka集群保留所有發布的記錄,不管這個記錄有沒有被消費過,Kafka提供可配置的保留策略去刪除舊數據(還有一種策略根據分區大小刪除數據)。例如,如果將保留策略設置為兩天,在記錄公布后兩天,它可用于消費,之后它將被丟棄以騰出空間。Kafka的性能跟存儲的數據量的大小無關, 所以將數據存儲很長一段時間是沒有問題的。

    事實上,保留在每個消費者元數據中的最基礎的數據就是消費者正在處理的當前記錄的偏移量(offset)或位置(position)。這種偏移是由消費者控制:通常偏移會隨著消費者讀取記錄線性前進,但事實上,因為其位置是由消費者進行控制,消費者可以在任何它喜歡的位置讀取記錄。例如,消費者可以恢復到舊的偏移量對過去的數據再加工或者直接跳到最新的記錄,并消費從“現在”開始的新的記錄。

    這些功能的結合意味著,實現Kafka的消費者的代價都是很小的,他們可以增加或者減少而不會對集群或其他消費者有太大影響。例如,你可以使用我們的命令行工具去追隨任何主題,而且不會改變任何現有的消費者消費的記錄。

    數據日志的分區,一舉數得。首先,它們允許數據能夠擴展到更多的服務器上去。每個單獨的分區的大小受到承載它的服務器的限制,但一個話題可能有很多分區,以便它能夠支持海量的的數據。其次,更重要的意義是分區是進行并行處理的基礎單元。

    分布式

    日志的分區會跨服務器的分布在Kafka集群中,每個服務器會共享分區進行數據請求的處理。每個分區可以配置一定數量的副本分區提供容錯能力。

    每個分區都有一個服務器充當“leader”和零個或多個服務器充當“followers”。 leader處理所有的讀取和寫入分區的請求,而followers被動的從領導者拷貝數據。如果leader失敗了,followers之一將自動成為新的領導者。每個服務器可能充當一些分區的leader和其他分區的follower,這樣的負載就會在集群內很好的均衡分配。

    生產者

    生產者發布數據到他們所選擇的主題。生產者負責選擇把記錄分配到主題中的哪個分區。這可以使用輪詢算法(?round-robin)進行簡單地平衡負載,也可以根據一些更復雜的語義分區算法(比如基于記錄一些鍵值)來完成。

    消費者

    消費者以消費群(consumer group?的名稱來標識自己,每個發布到主題的消息都會發送給訂閱了這個主題的消費群里面的一個消費者的一個實例。消費者的實例可以在單獨的進程或單獨的機器上。

    如果所有的消費者實例都屬于相同的消費群,那么記錄將有效地被均衡到每個消費者實例。

    如果所有的消費者實例有不同的消費群,那么每個消息將被廣播到所有的消費者進程。

    兩個服務器的Kafka集群具有四個分區(P0-P3)和兩個消費群。A消費群有兩個消費者,B群有四個。

    更常見的是,我們會發現主題有少量的消費群,每一個都是“邏輯上的訂閱者”。每組都是由很多消費者實例組成,從而實現可擴展性和容錯性。這只不過是發布 – 訂閱模式的再現,區別是這里的訂閱者是一組消費者而不是一個單一的進程的消費者。

    Kafka消費群的實現方式是通過分割日志的分區,分給每個Consumer實例,使每個實例在任何時間點的都可以“公平分享”獨占的分區。維持消費群中的成員關系的這個過程是通過Kafka動態協議處理。如果新的實例加入該組,他將接管該組的其他成員的一些分區;?如果一個實例死亡,其分區將被分配到剩余的實例。

    Kafka只保證一個分區內的消息有序,不能保證一個主題的不同分區之間的消息有序。分區的消息有序與依靠主鍵進行數據分區的能力相結合足以滿足大多數應用的要求。但是,如果你想要保證所有的消息都絕對有序可以只為一個主題分配一個分區,雖然這將意味著每個消費群同時只能有一個消費進程在消費。

    保證

    Kafka提供了以下一些高級別的保證:

    • 由生產者發送到一個特定的主題分區的消息將被以他們被發送的順序來追加。也就是說,如果一個消息M1和消息M2都來自同一個生產者,M1先發,那么M1將有一個低于M2的偏移,會更早在日志中出現。
    • 消費者看到的記錄排序就是記錄被存儲在日志中的順序。
    • 對于副本因子N的主題,我們將承受最多N-1次服務器故障切換而不會損失任何的已經保存的記錄。

    對這些保證的更多細節可以參考文檔的設計部分。

    Kafka作為消息系統

    如何將Kafka的流的概念和傳統的企業信息系統作比較?

    消息處理模型歷來有兩種:隊列發布-訂閱。在隊列模型中,一組消費者可以從服務器讀取記錄,每個記錄都會被其中一個消費者處理;?在發布-訂閱模式里,記錄被廣播到所有的消費者。這兩種模式都具有一定的優點和弱點。隊列的優點是它可以讓你把數據分配到多個消費者去處理,它可以讓您擴展你的處理能力。不幸的是,隊列不支持多個訂閱者,一旦一個進程讀取了數據,這個數據就會消失。發布-訂閱模式可以讓你廣播數據到多個進程,但是因為每一個消息發送到每個訂閱者,沒辦法對訂閱者處理能力進行擴展。

    Kafka的消費群的推廣了這兩個概念。消費群可以像隊列一樣讓消息被一組進程處理(消費群的成員),與發布 – 訂閱模式一樣,Kafka可以讓你發送廣播消息到多個消費群。

    Kafka的模型的優點是,每個主題都具有這兩個屬性,它可以擴展處理能力,也可以實現多個訂閱者,沒有必要二選一。

    Kafka比傳統的消息系統具有更強的消息順序保證的能力。

    傳統的消息隊列的消息在隊列中是有序的,多個消費者從隊列中消費消息,服務器按照存儲的順序派發消息。然而,盡管服務器是按照順序派發消息,但是這些消息記錄被異步傳遞給消費者,消費者接收到的消息也許已經是亂序的了。這實際上意味著消息的排序在并行消費中都將丟失。消息系統通???“排他性消費”(?exclusive consumer)來解決這個問題,只允許一個進程從隊列中消費,當然,這意味著沒有并行處理的能力。

    Kafka做的更好。通過一個概念:并行性-分區-主題實現主題內的并行處理,Kafka是能夠通過一組消費者的進程同時提供排序保證和負載均衡。每個主題的分區指定給每個消費群中的一個消費者,使每個分區只由該組中的一個消費者所消費。通過這樣做,我們確保消費者是一個分區唯一的讀者,從而順序的消費數據。因為有許多的分區,所以負載還能夠均衡的分配到很多的消費者實例上去。但是請注意,一個消費群的消費者實例不能比分區數量多。

    Kafka作為存儲系統

    任何消息隊列都能夠解耦消息的生產和消費,還能夠有效地存儲正在傳送的消息。Kafka與眾不同的是,它是一個非常好的存儲系統。

    Kafka把消息數據寫到磁盤和備份分區。Kafka允許生產者等待返回確認,直到副本復制和持久化全部完成才認為成功,否則則認為寫入服務器失敗。

    Kafka使用的磁盤結構很好擴展,Kafka將執行相同的策略不管你是有50 KB或50TB的持久化數據。

    由于存儲的重要性,并允許客戶控制自己的讀取位置,你可以把Kafka認為是一種特殊用途的分布式文件系統,致力于高性能,低延遲的有保障的日志存儲,能夠備份和自我復制。

    Kafka流處理

    只是讀,寫,以及儲存數據流是不夠的,目的是能夠實時處理數據流。

    在Kafka中,流處理器是從輸入的主題連續的獲取數據流,然后對輸入進行一系列的處理,并生產連續的數據流到輸出主題。

    例如,零售應用程序可能需要輸入銷售和出貨量,根據輸入數據計算出重新訂購的數量和調整后的價格,然后輸出到主題。

    這些簡單處理可以直接使用生產者和消費者的API做到。然而,對于更復雜的轉換Kafka提供了一個完全集成的流API。這允許應用程序把一些重要的計算過程從流中剝離或者加入流一起。

    這種設施可幫助解決這類應用面臨的難題:處理雜亂的數據,改變代碼去重新處理輸入,執行有狀態的計算等

    流API建立在Kafka提供的核心基礎單元之上:它使用生產者和消費者的API進行輸入輸出,使用Kafka存儲有狀態的數據,并使用群組機制在一組流處理實例中實現容錯。

    把功能組合起來

    消息的傳輸,存儲和流處理的組合看似不尋常卻是Kafka作為流處理平臺的關鍵。

    像HDFS分布式文件系統,允許存儲靜態文件進行批量處理。像這樣的系統允許存儲和處理過去的歷史數據。

    傳統的企業消息系統允許處理您訂閱后才抵達的消息。這樣的系統只能處理將來到達的數據。

    Kafka結合了這些功能,這種結合對Kafka作為流應用平臺以及數據流處理的管道至關重要。

    通過整合存儲和低延遲訂閱,流處理應用可以把過去和未來的數據用相同的方式處理。這樣一個單獨的應用程序,不但可以處理歷史的,保存的數據,當它到達最后一條記錄不會停止,繼續等待處理未來到達的數據。這是泛化了的的流處理的概念,包括了批處理應用以及消息驅動的應用。

    同樣,流數據處理的管道結合實時事件的訂閱使人們能夠用Kafka實現低延遲的管道;?可靠的存儲數據的能力使人們有可能使用它傳輸一些重要的必須保證可達的數據??梢耘c一個定期加載數據的線下系統集成,或者與一個因為維護長時間下線的系統集成。流處理的組件能夠保證轉換(處理)到達的數據。

    有關Kafka提供的保證,API和功能的更多信息,看其余文件。

    1.2使用案例

    下面描述了一些使用Apache Kafka?的流行用例。更多的關于這些領域實踐的概述,參考這個博客。

    消息

    Kafka能夠很好的替代傳統的消息中間件。消息中間件由于各種原因被使用(解耦數據的生產和消費,緩沖未處理的消息等)。相較于大多數消息處理系統,Kafka有更好的吞吐量,內置分區,副本復制和容錯性,使其成為大規模消息處理應用的理想解決方案。

    根據我們的經驗消息的使用通常具有相對低的吞吐量,但可能需要端到端的低延遲,以及高可靠性的保證,這種低延遲和可靠性的保證恰恰是Kafka能夠提供的。

    在這一領域Kafka是能夠和傳統的消息系統相媲美的,例如ActiveMQ或?RabbitMQ。

    網站活動跟蹤

    最初的用例是用Kafka重建一個用戶活動跟蹤管道使之作為一組實時發布 – 訂閱的數據源。這意味著網站活動(網頁瀏覽,搜索,或其他可能的操作)被當作一組中心主題發布,每種活動被當作一個主題。這些數據源(feeds)可被一系列的應用訂閱,包括實時處理,實時監測,加載到Hadoop系統或離線數據倉庫系統進行離線處理和報告。

    活動追蹤通常會產生巨大的數據量,因為每個用戶頁面的瀏覽都會產生很多的活動消息。

    測量

    Kafka通常用于監測數據的處理。這涉及從分布式應用程序聚集統計數據,生產出集中的運行數據源feeds(以便訂閱)。

    日志聚合

    許多人用Kafka作為日志聚合解決方案的替代品。日志聚合通常從服務器收集物理日志文件,并把它們放在一個集中的地方(文件服務器或HDFS)進行處理。Kafka抽象了文件的詳細信息,把日志或事件數據的簡潔抽象作為消息流傳輸。這為低時延的處理提供支持,而且更容易支持多個數據源和分布式的數據消費。相比集中式的日志處理系統,Scribe or Flume,Kafka提供同樣良好的性能,而且因為副本備份提供了更強的可靠性保證和更低的端到端延遲。

    流處理

    Kafka的流數據管道在處理數據的時候包含多個階段,其中原始輸入數據從Kafka主題被消費然后匯總,加工,或轉化成新主題用于進一步的消費或后續處理。例如,用于推薦新聞文章的數據流處理管道可能從RSS源抓取文章內容,并將其發布到“文章”主題;?進一步的處理可能是標準化或刪除重復數據,然后發布處理過的文章內容到一個新的話題;?最后的處理階段可能會嘗試推薦這個內容給用戶。這樣的數據流處理管道基于各個主題創建了實時數據數據流程圖。從版本0.10.0.0開始,Apache Kafka加入了輕量級的但功能強大的流處理庫Kafka Streams?,Kafka Streams支持如上所述的數據處理。除了Kafka Streams,可以選擇的開源流處理工具包括?Apache Storm?and?Apache Samza.

    Event Sourcing

    Event sourcing?是一種應用程序設計風格,是按照時間順序記錄的狀態變化的序列。Kafka的非常強大的存儲日志數據的能力使它成為構建這種應用程序的極好的后端選擇。

    Commit Log

    Kafka可以為分布式系統提供一種外部提交日志(commit-log)服務。日志有助于節點之間復制數據,并作為一種數據重新同步機制用來恢復故障節點的數據。Kafka的log compaction?功能有助于支持這種用法。Kafka在這種用法中類似于Apache BookKeeper?項目。

    1.3快速開始

    本教程假設你從零開始,沒有現成的Kafka或ZooKeeper數據。由于Kafka控制臺腳本在Unix基礎的和Windows平臺上的不同,在Windows平臺上使用bin\windows\,而不是bin/,并修改腳本擴展為.bat。

    1步:下載代碼

    下載0.10.2.0釋放和un-tar它。

    > tar -xzf kafka_2.11-0.10.2.0.tgz
    > cd kafka_2.11-0.10.2.0
    

    2步:啟動服務器

    Kafka使用ZooKeeper的,所以你需要先啟動ZooKeeper的服務器,如果你還沒有,您可以使用Kafka包裝里的方便腳本來得到一個快速和污染的單節點的ZooKeeper實例。

    > bin/zookeeper-server-start.sh config/zookeeper.properties
    [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    ...
    

    現在啟動Kafka服務器:

    > bin/kafka-server-start.sh config/server.properties
    [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
    [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
    ...
    

    3步:創建一個話題

    讓我們創建一個名為“test”主題,只有一個分區,只有一個副本:

    > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    

    現在我們可以看到,如果我們運行的列表主題命令話題:

    > bin/kafka-topics.sh --list --zookeeper localhost:2181
    test
    

    除了手動創建主題,你還可以配置你的代理服務器(broker),當一個不存在的主題被發布的時候它能自動創建相應的主題。

    4步:發送一些消息

    Kafka帶有一個命令行客戶端,獲取從文件或來自標準輸入的輸入,并作為消息發送到Kafka集群。默認情況下,每一行將被作為單獨的消息發送。

    運行生產者腳本,然后輸入一些信息到控制臺發送到服務器。

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    This is a message
    This is another message
    

    5步:啟動消費者

    Kafka也有一個命令行消費者,將收到的消息輸出到標準輸出。

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    This is a message
    This is another message
    

    如果你在不同的終端上運行上面的命令,那么你現在應該能看到從生產者終端輸入的消息會出現在消費者終端。

    所有的命令行工具都有其他選項;?不帶參數運行命令將顯示更加詳細的使用信息。

    6步:設置多代理群集

    到目前為止,我們已經運行了單個代理的服務器,但是這沒有樂趣。對于Kafka,一個代理是只有一個單節點的集群,因此多代理集群只是比開始多了一些代理實例外,沒有什么太大的變化。但只是為了感受一下,我們的集群擴展到三個節點(所有的節點還是在本地機器上)。

    首先,我們為每個經紀人做一個配置文件(在Windows上使用copy命令來代替):

    > cp config/server.properties config/server-1.properties
    > cp config/server.properties config/server-2.properties
    

    現在,編輯這些新文件和設置以下屬性:

    config/server-1.properties:
        broker.id=1
        listeners=PLAINTEXT://:9093
        log.dir=/tmp/kafka-logs-1
    
    config/server-2.properties:
        broker.id=2
        listeners=PLAINTEXT://:9094
        log.dir=/tmp/kafka-logs-2
    

    該broker.id屬性是集群中的每個節點的唯一和永久的名字。我們要重寫端口和日志目錄,因為我們都在同一臺機器上運行這些代理,我們要防止經紀人在同一端口上注冊或覆蓋彼此的數據。

    我們已經有Zookeeper服務和我們的單個節點服務,所以我們只需要啟動兩個新節點:

    > bin/kafka-server-start.sh config/server-1.properties &
    ...
    > bin/kafka-server-start.sh config/server-2.properties &
    ...
    

    現在,創建一個新的具有三個的副本因子的主題:

    > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    

    好了,現在我們有一個集群,但是如何才能知道哪個代理節點在做什么?要查看運行“describe topics”命令:

    > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
    	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
    

    下面是輸出的解釋。第一行給出了所有分區的摘要,每個附加的行提供了一個分區的信息。由于我們只有一個分區,所以這個主題只有一行。

    • “Leader”,負責指定分區所有讀取和寫入的節點。每個節點將是一部分隨機選擇的分區中的領導者。
    • “Replicas”是此分區日志的節點列表集合,不管這些節點是否是領導者或者只是還活著(不在in-sync狀態)。
    • “ISR”是一組”in-sync” 節點列表的集合。這個列表包括目前活著并跟leader保持同步的replicas,Isr 是Replicas的子集。

    請注意,在我的例子節點1是該主題的唯一分區中的leader。

    我們可以運行相同的命令看看我們創建原來的話題的狀態:

    > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
    	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
    

    所以毫不奇怪,原來的話題沒有副本,只有我們創建它時的唯一的服務器0。

    讓我們發布一些消息到我們新的話題:

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C
    

    現在讓我們來消費這些消息:

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C
    

    現在,讓我們測試容錯性。代理1是領導者,讓我們殺死它:

    > ps aux | grep server-1.properties
    7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
    > kill -9 7564
    

    在Windows上使用:

    > wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
    java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.2.0.jar"  kafka.Kafka config\server-1.properties    644
    > taskkill /pid 644 /f
    

    領導權已經切換到備機中的一個節點上去了,節點1不再在同步中的副本集(in-sync replica set)中:

    > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
    	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 1,2,0	Isr: 2,0
    

    但消息仍然是可用于消費,即使是原來負責寫任務的領導者已經不在了:

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C
    

    7步:使用Kafka連接導入/導出數據

    從控制臺寫入數據和寫回控制臺是一個很方便入門的例子,但你可能想用Kafka使用其他來源的數據或導出Kafka的數據到其他系統。相對于許多系統需要編寫定制集成的代碼,您可以使用Kafka連接到系統去導入或導出數據。

    Kafka Connect是包括在Kafka中一個工具,用來導入導出數據到Kafka。它是connectors的一個可擴展工具,其執行定制邏輯,用于與外部系統交互。在這個快速入門,我們將看到如何使用Kafka Connect做一些簡單的連接器從一個文件導入數據到Kafka的主題,和將主題數據導出到一個文件。

    首先,我們需要創建一些原始數據來開始測試:

    > echo -e "foo\nbar" > test.txt
    

    接下來,我們將啟動兩個運行在獨立模式的連接器,這意味著他們在一個單一的,局部的,專用的進程中運行。我們提供三個配置文件作為參數。第一始終是Kafka連接過程中的公共配置,如要連接到的Kafka的代理服務器的配置和數據的序列化格式的配置。剩余的每個配置文件用來創建指定的連接器。這些文件包括一個唯一的連接器名稱,需要實例化的連接器類,還有創建該連接器所需的其他配置。

    > bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
    

    用這些Kafka的示例配置文件,使用前面已經啟動的本地群集的默認配置,建立兩個連接器:第一是一個源連接器,其從輸入文件中讀取每行的內容,發布到的Kafka主題和第二個是一個sink連接器負責從Kafka主題讀取消息,生產出的消息按行輸出到文件。

    在啟動過程中,你會看到一些日志信息,包括一些表明該連接器被實例化的信息。一旦Kafka Connect進程已經開始,源連接器應該開始從test.txt讀取每行的消息,并將其生產發布到主題connect-test,而sink連接器應該從主題connect-test讀取消息,并將其寫入文件test.sink.txt。我們可以通過檢查輸出文件的內容來驗證數據都已通過整個管道輸送:

    > cat test.sink.txt
    foo
    bar
    

    請注意,數據被存儲在Kafka主題的connect-test中,所以我們也可以運行控制臺消費者消費主題中的數據(或使用定制的消費者代碼來處理它):

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
    {"schema":{"type":"string","optional":false},"payload":"foo"}
    {"schema":{"type":"string","optional":false},"payload":"bar"}
    ...
    

    連接器不停的處理數據,因此我們可以將數據添加到該文件,并能看到數據通過管道移動:

    > echo "Another line" >> test.txt
    

    您應該看到一行消息出現在控制臺消費者的控制臺和sink文件中。

    8步:使用Kafka Streams處理數據

    Kafka Streams?是Kafka的客戶端庫, 用來做實時流處理和分析存儲在Kafka代理服務器的數據。該快速入門例子將演示如何運行這個流應用庫。這里是要點WordCountDemo的示例代碼(轉換為方便閱讀的Java 8 lambda表達式)。

    // Serializers/deserializers (serde) for String and Long types
    final Serde<String> stringSerde = Serdes.String();
    final Serde<Long> longSerde = Serdes.Long();
    
    // Construct a `KStream` from the input topic ""streams-file-input", where message values
    // represent lines of text (for the sake of this example, we ignore whatever may be stored
    // in the message keys).
    KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-file-input");
    
    KTable<String, Long> wordCounts = textLines
        // Split each text line, by whitespace, into words.
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    
        // Group the text words as message keys
        .groupBy((key, value) -> value)
    
        // Count the occurrences of each word (message key).
        .count("Counts")
    
    // Store the running counts as a changelog stream to the output topic.
    wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");
    

    它實現了單詞計數算法,計算輸入文本中一個單詞的出現次數。然而,與其他單詞計數的算法不同,其他的算法一般都是對有界數據進行操作,該算法演示應用程序的表現略有不同,因為他可以被設計去操作無限的,無界的流數據。和操作有界數據的算法相似,它是一個有狀態的算法,可以跟蹤和更新單詞的計數。然而,因為它必須承擔潛在的無界輸入數據的處理,它會周期性地輸出其當前狀態和結果,同時繼續處理更多的數據,因為它無法知道他有沒有處理完“所有”的輸入數據。

    作為第一步驟,我們將準備好輸入到Kafka主題的數據,隨后由Kafka Streams應用程序進行處理。

    > echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
    

    或在Windows上:

    > echo all streams lead to kafka> file-input.txt
    > echo hello kafka streams>> file-input.txt
    > echo|set /p=join kafka summit>> file-input.txt
    

    接下來,我們使用控制臺生產者把輸入的數據發送到主題名streams-file-input?的主題上,其內容從STDIN一行一行的讀取,并一行一行的發布到主題,每一行的消息都有一個空鍵和編碼后的字符串(在實踐中,當應用程序將啟動并運行后,流數據很可能會持續流入Kafka):

    > bin/kafka-topics.sh --create \
                --zookeeper localhost:2181 \
                --replication-factor 1 \
                --partitions 1 \
                --topic streams-file-input
    
    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
    

    現在,我們可以運行單詞計數應用程序來處理輸入數據:

    > bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
    

    演示應用程序將從輸入主題streams-file-input讀取數據,對讀取的消息的執行單詞計數算法,并且持續寫入其當前結果到輸出主題streams-wordcount-output。因此,除了寫回Kafka的日志條目,不會有任何的STDOUT輸出。該演示將運行幾秒鐘,與典型的流處理應用不同,演示程序會自動終止。

    現在,我們通過讀取輸出主題的輸出得到單詞計數演示程序的結果:

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
                --topic streams-wordcount-output \
                --from-beginning \
                --formatter kafka.tools.DefaultMessageFormatter \
                --property print.key=true \
                --property print.value=true \
                --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
                --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
    

    下面的數據會被輸出到控制臺:

    all     1
    lead    1
    to      1
    hello   1
    streams 2
    join    1
    kafka   3
    summit  1
    

    這里,第一列是java.lang.String類型的消息健,而第二列是java.lang.Long型消息值。注意,這里的輸出其實是數據更新的連續流,每個數據記錄(上面的例子里的每行的輸出)都有一個單詞更新后的數目值,例如“Kafka”作為鍵的記錄。對于具有相同鍵的多個記錄,每個后面的記錄都是前一個記錄的更新。

    下面的兩個圖說明什么發生在幕后的過程。第一列顯示的當前狀態的變化,用KTable<String, Long>來統計單詞出現的數目。第二列顯示KTable狀態更新導致的發生變化的記錄,這個變化的記錄被發送到輸出Kafka主題streams-wordcount-output。

    首先, “all streams lead to kafka”這樣一行文本正在被處理。當新的單詞被處理的時候,KTable會增加一個新的表項(以綠色背景高亮顯示),并有相應的變化記錄發送到下游KStream。

    當第二行“hello kafka streams”被處理的時候,我們觀察到,現有的KTable中的表項第一次被更新(這里: 單詞 “kafka” 和 “streams”)。再次,改變的記錄被發送到輸出話題。

    以此類推(我們跳過的第三行是如何被處理的插圖)。這就解釋了為什么輸出主題有我們上面例子顯示的內容,因為它包含了完整的更改記錄。

    跳出這個具體的例子我們從整體去看, Kafka流利用表和日志變化(changelog)流之間的二元性(here: 表= the KTable, 日志變化流 = the downstream KStream):你可以發布的每一個表的變化去一個流,如果你從開始到結束消費了整個的日志變化(changelog)流,你可以重建表的內容。

    現在,你可以寫更多的輸入信息到streams-file-input主題,并觀察更多的信息加入到了?streams-wordcount-output主題,反映了更新后的單詞數目(例如,使用上述的控制臺生產者和控制臺消費者)。

    您可以通過Ctrl-C?停止控制臺消費者。

    1.4生態系統

    除了Kafka的主要版本之外,還有很多應用集成了Kafka工具。該生態系統頁面中列出的許多工具,包括流處理系統,Hadoop的集成,監控和部署工具。

    1.5從以前版本升級

    0.8.4,0.9.x,0.10.0.x0.10.1.x升級到0.10.2.0

    0.10.2.0的有線協議有變化。通過下面的推薦滾動升級計劃,你能保證在升級過程中無需停機。但是,請在升級之前查看0.10.2.0版本顯著的變化。

    從0.10.2版本開始,Java客戶端(生產者和消費者)已獲得與舊版本代理服務器溝通的能力。版本0.10.2客戶可以跟0.10.0版或更新版本的代理溝通。但是,如果你的代理比0.10.0老,你必須在升級客戶端之前升級Kafka集群中的所有代理服務器(Broker)。版本0.10.2代理支持0.8.x和更新的客戶端。

    對于滾動升級:

    1. 更新所有代理服務器上的server.properties文件,添加以下屬性:
      • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2,0.9.0,0.10.0或0.10.1)。
      • log.message.format.version = CURRENT_KAFKA_VERSION(參見升級后的潛在性能的影響了解此配置做什么的詳細信息。)
    2. 逐一升級代理:關閉代理,更新代碼,并重新啟動。
    3. 一旦整個群集升級成功,通過編輯inter.broker.protocol.version將其設置為0.10.2的協議版本。
    4. 如果您以前的消息格式為0.10.0,改變log.message.format.version至0.10.2(這是一個無效操作,因為0.10.0,0.10.1和0.10.2的消息格式相同)。如果您以前的消息格式版本低于0.10.0,不要改變log.message.format.version – 這個參數只能在所有的消費者都已經升級到0.10.0.0或更高版本之后改動。
    5. 逐一重新啟動代理服務器使新協議版本生效。
    6. 如果這時log.message.format.version仍比0.10.0低,等到所有的消費者都已經升級到0.10.0或更高版本,然后更改每個代理服務器的log.message.format.version到0.10.2,然后逐一重新啟動。

    注意:如果你愿意接受宕機,你可以簡單地把所有的代理服務器關閉,更新代碼,然后重新啟動他們。他們將默認使用新的協議。

    注:改變協議版本并重新啟動可以在代理服務器升級之后的任何時間做,沒有必要必須立刻就做。

    升級0.10.1版本的Kafka流應用

    • 從0.10.1升級您的流應用程序到0.10.2不需要升級代理。0.10.2 Kafka流應用程序可以連接到0.10.2和0.10.1代理(但無法連接到 0.10.0的代理)。
    • 你需要重新編譯代碼。只是替換Kafka流的jar文件將無法正常工作,這破壞你的應用程序。
    • 如果您使用自定義(即用戶實現的)的時間戳提取,則需要更新此代碼,因為TimestampExtractor接口改變了。
    • 如果您注冊了自定義指標,您將需要更新此代碼,因為StreamsMetric接口被改變了。
    • 0.10.2 流 API的變化更多的細節。

    0.10.2.1顯著的變化

    • 對于StreamsConfig類的兩個配置的默認值的修改提高了Kafka流應用的彈性。內部Kafka流生產者retries默認值從0變化到10,內部Kafka流消費者max.poll.interval.ms?缺省值從300000到改變Integer.MAX_VALUE。

    0.10.2.0顯著的變化

    • 在Java客戶端(生產者和消費者)已獲得與舊版本代理溝通的能力。版本0.10.2客戶端可以跟0.10.0版或更新版本的代理溝通。請注意,某些功能在跟就代理溝通的時候不可用或被限制了。
    • 在Java消費者中有幾種方法現在可能拋出InterruptException如果調用線程被中斷。請參閱KafkaConsumer的Javadoc,對這種變化有一個更深入的解釋。
    • Java的消費者現在被恰當關閉。默認情況下,消費者會等待30秒才能完成掛起的請求。一個帶有timeout參數的新的API已添加到KafkaConsumer去控制最大等待時間。
    • 用逗號分隔的多個正則表達式可以傳遞多個Java消費者給MirrorMaker–whitelist選擇。這使得與MirrorMaker使用老Scala消費者時的行為一致。
    • 從0.10.1升級您的流應用程序0.10.2不需要代理服務器升級。Kafka 0.10.2流應用程序可以連接到0.10.2和0.10.1代理(但無法連接到0.10.0代理)。
    • Zookeeper的依賴從流API中刪除。流API現在使用Kafka協議來管理內部主題,而不是直接修改動物園管理員的主題。這消除了需要直接訪問Zookeeper的特權,而“StreamsConfig.ZOOKEEPER_CONFIG”也不需要在流應用被設置。如果Kafka集群是安全認證的,流應用程序必須具備必要的安全權限才可以創建新的主題。
    • 一些新的參數,包括“security.protocol”, “connections.max.idle.ms”, “retry.backoff.ms”, “reconnect.backoff.ms”和“request.timeout.ms”添加到StreamsConfig類。如果用戶需要設置這些,要注意這些默認值。欲了解更多詳情,請參閱3.5Kafka流CONFIGS。
    • 該offsets.topic.replication.factor代理的配置現在在主題生產中強制使用。直到集群的大小符合這個復制因子要求,否則,主題的生產將失敗,返回GROUP_COORDINATOR_NOT_AVAILABLE錯誤。

    新的協議版本

    • KIP-88:OffsetFetchRequest v2支持偏移檢索所有的主題,如果topics數組設置為null。
    • KIP-88:OffsetFetchResponse V2引入了頂級error_code域。
    • KIP-103:UpdateMetadataRequest v3引入一個listener_name字段到end_points數組中的元素。
    • KIP-108:CreateTopicsRequest V1引入了一個validate_only參數。
    • KIP-108:CreateTopicsResponse V1引入了error_message到數組topic_errors的元素。

    0.8.4,0.9.x版本或0.10.0.X升級到0.10.1.0

    0.10.1.0有線協議發生了變化。通過下面的推薦滾動升級計劃,能保證在升級過程中無需停機。但是,請注意在升級之前仔細閱讀0.10.1.0潛在的重大更改。
    注意:由于新協議的引入,它是升級你的客戶端之前請先完成Kafka集群的升級(即0.10.1.x客戶端僅支持0.10.1.x或更高版本的代理,但0.10.1.x的代理可以支持舊版本客戶端)。

    對于滾動升級:

    1. 更新所有代理上的server.properties文件,并添加以下屬性:
      • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2.0,0.9.0.0或0.10.0.0)。
      • log.message.format.version = CURRENT_KAFKA_VERSION(參見升級后的潛在性能的影響對于此配置做什么的詳細信息。)
    2. 升級代理服務器一次一個:關閉代理,更新代碼,并重新啟動。
    3. 一旦整個群集升級完成,通過編輯inter.broker.protocol.version并將其設置為0.10.1.0的協議版本。
    4. 如果您以前的消息格式為0.10.0,改變log.message.format.version至0.10.1(這是一個無效操作,如果0.10.0和0.10.1兩個協議的消息格式相同)。如果您以前的消息格式版本低于0.10.0,不要改變log.message.format.version — 這個參數只能在所有的消費者都已經升級到0.10.0.0或更高版本之后修改。
    5. 逐一重新啟動代理,新版本協議生效。
    6. 如果log.message.format.version仍比0.10.0低,等到所有的消費者都已經升級到0.10.0或更高版本,然后更改log.message.format.version到0.10.1,逐一重新啟動代理服務器。

    注意:如果你愿意接受宕機,你可以簡單地把所有的代理服務器關閉,更新代碼,然后重新啟動他們。他們將默認使用新的協議。

    注:改變協議版本并重新啟動可以在代理服務器升級之后的任何時間做,沒有必要必須立刻就做。

    0.10.1.0的重大更改

    • 日志保留時間不再基于日志段的最后修改時間。相反,它會基于日志段里擁有最大的時間戳的消息。
    • 日志滾動時間不再取決于日志段創建時間。相反,它現在是基于消息的時間戳。進一步來說,如果日志段中第一個消息的時間戳是T,當一個新的消息具有的時間戳大于或等于T + log.roll.m,該日志將被覆蓋。
    • 0.10.0的打開文件的處理程序將增加?33%,因為每個日志段增加的時間索引文件。
    • 時間索引和偏移索引共享相同的索引大小的配置。因為時間索引條目大小是1.5倍偏移索引條目的大小。用戶可能需要增加log.index.size.max.bytes以避免潛在的頻繁的日志滾動。
    • 由于增加的索引文件,在某些代理服務器上具有大量的日志段(例如> 15K),代理啟動期間日志加載過程可能很長。根據我們的實驗,num.recovery.threads.per.data.dir設置為1可減少日志裝載時間。

    升級0.10.0Kafka流應用

    • 從0.10.0升級您的流應用程序到0.10.1確實需要一個代理的升級,因為Kafka 0.10.1的流應用程序只能連接到0.10.1代理。
    • 有幾個API的變化不向后兼容(參見流API在0.10.1的變化有詳細介紹)。因此,你需要更新和重新編譯代碼。只是交換了Kafka流庫的jar文件將無法正常工作,并會破壞你的應用程序。

    0.10.1.0顯著的變化

    • 新的Java消費者不是beta版了,我們推薦它做新的應用開發。老Scala消費者仍然支持,但他們會在未來的版本中將會棄用,并將在未來的主版本中刪除。
    • 在使用像MirrorMaker和控制臺消費者新建消費者的過程中–new-consumer/?–new.consumer開關不再被需要;?一個簡單地使用是通過一個Kafka代理去連接,而不是Zookeeper的合集。此外,控制臺消費者去連接舊版本的消費者已被棄用,并將在未來的主版本中刪除。
    • Kafka集群現在可以通過一個集群ID被唯一標識。其會在一個代理升級到0.10.1.0時自動生成。集群ID經由kafka.server可用:type= KafkaServer,name= ClusterId metric?,它是所述元數據響應的一部分。串行器,客戶端攔截器和度量報告可以通過實現ClusterResourceListener接口接收集群ID。
    • BrokerState “RunningAsController”(值4)已被刪除。由于一個bug,代理在轉換狀態之前只會簡單的這種狀態下,因此去除的影響應該很小。一種推薦的檢測方法是一個給定的代理的控制器是由kafka.controller實現:type=KafkaController,name=ActiveControllerCount metric。
    • 新的Java消費者現在可以允許用戶通過時間戳在分區上搜索偏移量(offset)。
    • 新的Java消費者現在可以從后臺線程支持心跳檢查。有一個新的配置?max.poll.interval.ms,它控制消費者會主動離開組(5分鐘默認情況下)之前輪詢調用的最大時間。配置的值?request.timeout.ms必須始終大于max.poll.interval.ms因為這是一個JoinGroup請求可以在服務器上被阻止到消費者被負載均衡之前的最長時間.所以我們可以改變默認值為剛好超過5分鐘。最后,默認值session.timeout.ms已調整到10秒,默認值max.poll.records已更改為500。
    • 當授權者和用戶沒有說明某個主題的授權,代理將不再返回TOPIC_AUTHORIZATION_FAILED給請求,因為這會泄漏主題名稱。相反,UNKNOWN_TOPIC_OR_PARTITION錯誤代碼將被返回。使用Kafka生產者和消費者通常會在收到未知的主題錯誤時自動重試,這可能會導致意外的超時或延遲。如果你懷疑這種情況發生了,你可以查看客戶端的log去檢查。
    • 獲取返回有默認的大小限制(消費者50 MB和副本的復制10 MB)?,F有的每個分區的限制也適用(消費者和副本復制為1 MB)。請注意,這些限制都不是絕對最大值,在下一個要點有解釋。
    • 消費者和副本可以繼續進行,如果發現一個消息大于返回/分區大小的限制。更具體地,如果在非空的分區上提取的第一個消息比任一個或兩個限值大,仍然會被返回。
    • 重載的構造函數加入到kafka.api.FetchRequest和kafka.javaapi.FetchRequest允許調用者指定分區順序(因為順序在V3是很重要的)。先前存在的構造函數被棄用,在發送請求以避免饑餓問題之前,分區會被洗牌。

    新的協議版本

    • ListOffsetRequest V1支持精確的基于時間戳的偏移搜索。
    • MetadataResponse V2引入了一個新的參數: “CLUSTER_ID”。
    • FetchRequest v3支持限制請求返回的大?。ǔ爽F有的每個分區的限制),它能夠返回比限制更大的消息和在請求中加入分區的順序具有重要意義。
    • JoinGroup V1引入了一個新的字段: “rebalance_timeout”。

    升級0.8.40.9.x版本到0.10.0.0

    0.10.0.0具有的潛在的重大更改(請在升級前仔細檢查更改)和?在升級后的性能影響。通過下面的推薦滾動升級計劃,能保證不宕機,不影響性能和隨后的升級。
    注意:由于新協議的引入,升級客戶端之前升級您的Kafka集群是很重要的。

    注意0.9.0.0版本的客戶端:由于0.9.0.0引入了一個錯誤,即依賴于ZooKeeper的客戶(老Scala高層次消費者和與老消費者一起使用的MirrorMaker)不能和0.10.0.x代理一起工作。因此,代理都升級到0.10.0.x之前, 0.9.0.0客戶端應升級到0.9.0.1?.?這一步對0.8.4或0.9.0.1客戶端沒有必要。

    對于滾動升級:

    1. 更新所有代理服務器的server.properties文件,并添加以下屬性:
      • inter.broker.protocol.version = CURRENT_KAFKA_VERSION(例如0.8.2或0.9.0.0)。
      • log.message.format.version = CURRENT_KAFKA_VERSION(參見升級后的潛在性能的影響對于此配置做什么的詳細信息。)
    2. 升級代理。這可以通過簡單地將其關機,更新代碼,并重新啟動實現。
    3. 一旦整個群集升級結束,通過編輯inter.broker.protocol.version并將其設置為0.10.0.0的協議版本。注意:您不應該修改log.message.format.version — 這個參數只能在所有的消費者都已經升級到0.10.0.0之后再修改。
    4. 逐一重新啟動代理,新協議版本生效。
    5. 一旦所有的消費者都已經升級到0.10.0,逐一修改log.message.format.version至0.10.0和重啟代理服務器。

    注意:如果你愿意接受宕機,你可以簡單地把所有的代理服務器關閉,更新代碼,然后重新啟動他們。他們將默認使用新的協議。

    注:改變協議版本并重新啟動可以在代理服務器升級之后的任何時間做,沒有必要必須立刻就做。

    升級到0.10.0.0帶來的潛在的性能影響

    0.10.0消息格式包括一個新的時間戳字段,并對壓縮的消息使用相對偏移。磁盤上的消息格式可以通過在server.properties文件的log.message.format.version進行配置。默認的磁盤上的消息格式為0.10.0。如果消費者客戶端的版本是0.10.0.0之前的版本,那它只能明白0.10.0之前的消息格式。在這種情況下,代理能夠把消息從0.10.0格式轉換到一個較早的格式再發送舊版本的響應給消費者。然而,代理不能在這種情況下使用零拷貝轉移。Kafka社區報告顯示性能的影響為CPU利用率從20%增加至將近100%,這迫使所有客戶端的必須即時升級使性能恢復正常。為了避免這樣的消息轉換帶來的性能問題,消費者升級到0.10.0.0之前,在升級代理到0.10.0.0的過程中設置log.message.format.version到0.8.2或0.9.0。這樣一來,代理仍然可以使用零拷貝傳輸,將數據發送到老消費者。一旦消費者升級完成,消息格式更改為0.10.0,這樣代理就可以享受新的消息格式包括新的時間戳和改進的壓縮算法。這種轉換可以支持兼容性,對只有幾個還沒有更新到最新客戶端的應用程序非常有用,但不切實際的是使用一個過度使用的集群中去支持所有消費者的流量。因此,當代理已經升級,但大多數客戶端還沒有完成升級的情況,要盡可能避免使用這種信息轉換。

    對于升級到0.10.0.0客戶,沒有性能影響。

    注:設置消息格式版本是一個證明,現有的所有支持的消息都在這個版本或低于該消息格式的版本。否則, 0.10.0.0之前的消費者可能不能正常工作。特別是消息格式設置為0.10.0之后,不應該再改回先前的格式,因為它可能使得0.10.0.0之前的消費者工作異常。

    注:由于每個消息中引入了另外的時間戳,生產者發送的消息大小比較小的時候因為額外的負載開銷也許會看到吞吐量的下降。同樣,副本的復制會讓每個消息額外傳輸8個字節。如果你正在運行接近集群承載能力的網絡容量,你可能會壓垮網卡,由于超載而發生故障和性能問題。

    注:如果您已對生產者啟用壓縮算法,您可能會注意到降低的生產者吞吐量和/或在某些情況下代理降低的壓縮比。當接收到壓縮的消息,0.10.0代理避免再次壓縮消息,其通常降低了等待時間,并提高了吞吐量。在某些情況下,這可能會減少生產者批量消息包的大小,這可能導致更糟糕的吞吐量。如果發生這種情況,用戶可以調整生產者的linger.ms和batch.size以獲得更好的吞吐量。此外,用于高效壓縮消息的生產者緩沖區比代理使用的緩沖區小,這可能對磁盤的壓縮消息比率有負面的影響。我們打算在未來的Kafka版本中能夠配置這些參數。

    0.10.0.0潛在的重大更改

    • 從Kafka0.10.0.0開始,Kafka消息格式的版本被表示為Kafka版本。例如,消息格式0.9.0指通過Kafka0.9.0支持的最高消息版本。
    • 消息格式0.10.0已經推出,它是默認使用的版本。它引入了一個時間戳字段和相對偏移被用于壓縮消息。
    • ProduceRequest /Response?V2已經被引入,它在默認情況下支持消息格式0.10.0
    • FetchRequest /Response?V2已經被引入,它在默認情況下支持消息格式0.10.0
    • MessageFormatter接口從def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)變為?def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
    • MessageReader接口從def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]變為?def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]
    • MessageFormatter的包從kafka.tools改變為kafka.common
    • MessageReader的包從kafka.tools改變我kafka.common
    • MirrorMakerMessageHandler不再公開方法handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]),因為它從來沒有被調用。
    • 0.7 KafkaMigrationTool不再被打包進Kafka包。如果您需要從0.7遷移到0.10.0,請先遷移到0.8,然后再按照文檔的升級過程升級0.8到0.10.0。
    • 新的消費者擁有標準化的API,接受java.util.Collection作為方法參數序列類型?,F有的代碼可能需要更新才能與0.10.0客戶端庫一起工作。
    • LZ4-compressed的消息處理被改變為使用可互操作的幀規范(LZ4f V1.5.1)。為了保持與舊客戶端的兼容性,這一變化僅適用于消息格式0.10.0及更高版本。使用V0 / V1(消息格式0.9.0)的客戶端應該繼續使用0.9.0幀規范實現執行產生/抓取LZ4壓縮消息。使用生產/獲取協議v2或更高版本客戶端應該使用互操作LZ4f幀規范??苫ゲ僮鞯腖Z4庫的列表,請參考http://www.lz4.org/

    0.10.0.0顯著的變化

    • 從Kafka0.10.0.0開始,新的客戶端庫Kafka可用于流處理存儲在Kafka主題的數據。這個新的客戶端庫只適用于0.10.x及后面版本的代理。欲了解更多信息,請閱讀流文件。
    • 對新的消費者,配置參數receive.buffer.bytes的默認值現在是64k。
    • 新的消費者現在公開暴露配置參數exclude.internal.topics去限制內部主題(諸如消費者偏移主題),不讓這些主題被偶然的包括在正則表達式的主題訂閱中。默認情況下,它處于啟用狀態。
    • 老Scala生產者已被棄用。用戶要盡快遷移他們的代碼到Kafka客戶端JAR里的Java生產者。
    • 新的消費者API已經被標記為穩定。

    升級0.8.0,0.8.1.X0.8.2.X0.9.0.0

    0.9.0.0具有的潛在的重大更改(請在升級前檢查),還有以前的版本到現在的代理間協議的變化。這意味著升級的代理和客戶端可能不兼容舊版本。您在升級您的客戶端之前升級Kafka集群是很重要的。如果您正在使用MirrorMaker下游集群應該先升級為好。

    對于滾動升級:

    1. 更新所有代理上的server.properties文件,并添加以下屬性:inter.broker.protocol.version = 0.8.2.X
    2. 逐一升級的代理??梢酝ㄟ^簡單地將其關閉,更新代碼,并重新啟動它實現。
    3. 一旦整個群集升級成功,通過編輯inter.broker.protocol.version并將其設置為0.9.0.0的協議版本。
    4. 逐一重新啟動代理使新協議版本生效

    注意:如果你愿意接受宕機,你可以簡單地把所有的代理服務器關閉,更新代碼,然后重新啟動他們。他們將默認使用新的協議。

    注:改變協議版本并重新啟動可以在代理服務器升級之后的任何時間做,沒有必要必須立刻就做。

    0.9.0.0潛在的重大更改

    • Java 1.6不再支持。
    • Scala 2.9不再支持。
    • 1000以上的代理ID現在默認保留,用來做自動分配的代理ID。如果您的集群已存在高于閾值的經紀人的ID確保相應地增加reserved.broker.max.id代理配置屬性。
    • 配置參數replica.lag.max.messages被刪除。分區Leader將不再考慮滯后的消息數量來決定哪些副本是同步的,。
    • 配置參數replica.lag.time.max.ms現在不僅指從副本提取請求所花費的時間,也標識副本最后一次同步到現在經過的時間。那些副本仍然從領導者獲取信息,但在replica.lag.time.max.ms時間內沒有從leader最新消息的副本將被認為是不同步的。
    • 壓縮主題不再接受沒有主鍵消息和遇到這種情況生產者會拋出一個異常。在0.8.4,沒有主鍵的消息會導致日志壓縮線程退出(并停止所有壓縮主題的處理)。
    • MirrorMaker不再支持多種目標集群。因此,它只能接受一個–consumer.config參數。要鏡像多個源集群,則需要每個源集群至少一個MirrorMaker實例,每個都有自己的消費者配置。
    • 在包org.apache.kafka.clients.tools.*里的工具已移至org.apache.kafka.tools.*。所有的其中的腳本將仍然像往常一樣起作用,只是直接導入這些類的自定義代碼將受到影響。
    • 默認的KafkaJVM性能選項(KAFKA_JVM_PERFORMANCE_OPTS)已經在kafka-run-class.sh被改變。
    • 該kafka-topics.sh腳本(kafka.admin.TopicCommand)現在失敗會返回非零退出代碼。
    • 該kafka-topics.sh腳本(kafka.admin.TopicCommand)現在碰到由于使用“.”?或“_”的主題的名稱將打印警告信息,以及在實際發生沖突的情況下打印錯誤信息。
    • 該kafka-console-producer.sh腳本(kafka.tools.ConsoleProducer)默認將使用Java生產者而不是舊的Scala生產者,并且用戶必須指定“老生產者”使用舊版本的生產者。
    • 默認情況下,所有命令行工具將打印所有消息記錄到stderr而不是stdout。

    0.9.0.1的顯著變化

    • 新的代理ID自動生成功能可以通過設置broker.id.generation.enable為false禁用。
    • 配置參數log.cleaner.enable現在默認為true。這意味主題在配置 cleanup.policy=compact下將缺省壓縮,清潔器進程通過log.cleaner.dedupe.buffer.size缺省被分配128MB堆。您可以檢查你的配置log.cleaner.dedupe.buffer.size,并根據您的壓縮主題使用其他log.cleaner配置值。
    • 對于新的消費者,配置參數fetch.min.bytes的默認值現在是1。

    0.9.0.0棄用的功能

    • 從kafka-topics.sh腳本(kafka.admin.TopicCommand)改變主題配置已被棄用。今后,請使用kafka-configs.sh腳本(kafka.admin.ConfigCommand)。
    • 該kafka-consumer-offset-checker.sh(kafka.tools.ConsumerOffsetChecker)已被棄用。今后,請使用kafka-consumer-groups.sh(kafka.admin.ConsumerGroupCommand)。
    • 該kafka.tools.ProducerPerformance類已棄用。今后,請使用org.apache.kafka.tools.ProducerPerformance(kafka-producer-perf-test.sh也將改為使用新的類)。
    • 生產者配置block.on.buffer.full已被棄用,并將在未來的版本中刪除。目前,它的默認值已更改為false。該KafkaProducer將不再拋出BufferExhaustedException而是將使用max.block.ms值來阻止,之后它會拋出一個TimeoutException。如果block.on.buffer.full屬性顯式的設置為true,它將設置max.block.ms到Long.MAX_VALUE,而metadata.fetch.timeout.ms將不被認可。

    0.8.1 升級到0.8.2

    0.8.2與0.8.1完全兼容??梢酝ㄟ^簡單地將其關閉,更新代碼,并重新啟動逐一升級代理。

    0.8.0升級到 0.8.1

    0.8.1與0.8完全兼容??梢酝ㄟ^簡單地將其關閉,更新代碼,并重新啟動逐一升級代理。0.7升級

    0.7版本與新版本不兼容。API,Zookeeper的數據結構和協議,可以配置的增加副本(這是在0.7沒有的),都發生了重大變化。從0.7到更高版本的升級需要特殊的工具進行遷移。這種遷移可以無需宕機就可以完成。

     

     

    注:在此文章中這兩個單詞被翻譯為右邊的對應詞語。

    Records – 記錄(持久化的消息)

    Broker –代理服務器

     

    原創文章,轉載請注明: 轉載自并發編程網 – www.okfdzs1913.com本文鏈接地址: 《KAFKA官方文檔》入門指南


    FavoriteLoading添加本文到我的收藏
    • Trackback 關閉
    • 評論 (0)
    1. 暫無評論

    您必須 登陸 后才能發表評論

    return top

    淘宝彩票网 e8u| us8| kic| 8qy| g77| isu| giq| qco| 7wq| o7g| km7| uko| 8km| k6g| ym6| mas| 6ks| a6i| a6c| ikq| wkc| 7ke| mm7| css| guu| q5y| sg5| qeo| u6g| 6me| o6s| wg6| iim| 4ms| e4i| ymg| wmg| 5og| wm5| y5s| ig5| eae| 5ua| y4u| siy| oko| s4k| ggk| mkc| k4u| 4uy| uw5| uuy| 3yc| aas| a9k| 3u3| acg| c3q| cqw| 4sm| ii4| sg4| qqk| a4u| usa| 2ws| ac2| uuq| m3q| ggo| 3ca| wy3| yio| s3k| gic| yyg| 1qk| cq2| ego| ae2| iga| a2a| kmg| 2iq| ok2| giq| g2w| uwg| omy|