免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
三萬字 | Kafka 知識體系保姆級教程寶典

Kafka 涉及的知識點(diǎn)如下圖所示,本文將逐一講解:

一、消息隊(duì)列

1. 消息隊(duì)列的介紹

消息(Message)是指在應(yīng)用之間傳送的數(shù)據(jù),消息可以非常簡單,比如只包含文本字符串,也可以更復(fù)雜,可能包含嵌入對象。消息隊(duì)列(Message Queue)是一種應(yīng)用間的通信方式,消息發(fā)送后可以立即返回,有消息系統(tǒng)來確保信息的可靠專遞,消息發(fā)布者只管把消息發(fā)布到MQ中而不管誰來取,消息使用者只管從MQ中取消息而不管誰發(fā)布的,這樣發(fā)布者和使用者都不用知道對方的存在。

2. 消息隊(duì)列的應(yīng)用場景

消息隊(duì)列在實(shí)際應(yīng)用中包括如下四個場景:

  • 應(yīng)用耦合:多應(yīng)用間通過消息隊(duì)列對同一消息進(jìn)行處理,避免調(diào)用接口失敗導(dǎo)致整個過程失?。?/p>

  • 異步處理:多應(yīng)用對消息隊(duì)列中同一消息進(jìn)行處理,應(yīng)用間并發(fā)處理消息,相比串行處理,減少處理時間;

  • 限流削峰:廣泛應(yīng)用于秒殺或搶購活動中,避免流量過大導(dǎo)致應(yīng)用系統(tǒng)掛掉的情況;

  • 消息驅(qū)動的系統(tǒng):系統(tǒng)分為消息隊(duì)列、消息生產(chǎn)者、消息消費(fèi)者,生產(chǎn)者負(fù)責(zé)產(chǎn)生消息,消費(fèi)者(可能有多個)負(fù)責(zé)對消息進(jìn)行處理;

下面詳細(xì)介紹上述四個場景以及消息隊(duì)列如何在上述四個場景中使用:

  1. 異步處理

具體場景:用戶為了使用某個應(yīng)用,進(jìn)行注冊,系統(tǒng)需要發(fā)送注冊郵件并驗(yàn)證短信。對這兩個操作的處理方式有兩種:串行及并行。

  • 串行方式:新注冊信息生成后,先發(fā)送注冊郵件,再發(fā)送驗(yàn)證短信;

    在這種方式下,需要最終發(fā)送驗(yàn)證短信后再返回給客戶端。

  • 并行處理:新注冊信息寫入后,由發(fā)短信和發(fā)郵件并行處理;

    在這種方式下,發(fā)短信和發(fā)郵件 需處理完成后再返回給客戶端。假設(shè)以上三個子系統(tǒng)處理的時間均為50ms,且不考慮網(wǎng)絡(luò)延遲,則總的處理時間:

    串行:50+50+50=150ms
    并行:50+50 = 100ms

  • 若使用消息隊(duì)列:

在寫入消息隊(duì)列后立即返回成功給客戶端,則總的響應(yīng)時間依賴于寫入消息隊(duì)列的時間,而寫入消息隊(duì)列的時間本身是可以很快的,基本可以忽略不計(jì),因此總的處理時間相比串行提高了2倍,相比并行提高了一倍;

  1. 應(yīng)用耦合

具體場景:用戶使用QQ相冊上傳一張圖片,人臉識別系統(tǒng)會對該圖片進(jìn)行人臉識別,一般的做法是,服務(wù)器接收到圖片后,圖片上傳系統(tǒng)立即調(diào)用人臉識別系統(tǒng),調(diào)用完成后再返回成功,如下圖所示:

該方法有如下缺點(diǎn):

  • 人臉識別系統(tǒng)被調(diào)失敗,導(dǎo)致圖片上傳失敗;

  • 延遲高,需要人臉識別系統(tǒng)處理完成后,再返回給客戶端,即使用戶并不需要立即知道結(jié)果;

  • 圖片上傳系統(tǒng)與人臉識別系統(tǒng)之間互相調(diào)用,需要做耦合;

若使用消息隊(duì)列:

客戶端上傳圖片后,圖片上傳系統(tǒng)將圖片信息如uin、批次寫入消息隊(duì)列,直接返回成功;而人臉識別系統(tǒng)則定時從消息隊(duì)列中取數(shù)據(jù),完成對新增圖片的識別。

此時圖片上傳系統(tǒng)并不需要關(guān)心人臉識別系統(tǒng)是否對這些圖片信息的處理、以及何時對這些圖片信息進(jìn)行處理。事實(shí)上,由于用戶并不需要立即知道人臉識別結(jié)果,人臉識別系統(tǒng)可以選擇不同的調(diào)度策略,按照閑時、忙時、正常時間,對隊(duì)列中的圖片信息進(jìn)行處理。

  1. 限流削峰

具體場景:購物網(wǎng)站開展秒殺活動,一般由于瞬時訪問量過大,服務(wù)器接收過大,會導(dǎo)致流量暴增,相關(guān)系統(tǒng)無法處理請求甚至崩潰。而加入消息隊(duì)列后,系統(tǒng)可以從消息隊(duì)列中取數(shù)據(jù),相當(dāng)于消息隊(duì)列做了一次緩沖。

該方法有如下優(yōu)點(diǎn):

  • 請求先入消息隊(duì)列,而不是由業(yè)務(wù)處理系統(tǒng)直接處理,做了一次緩沖,極大地減少了業(yè)務(wù)處理系統(tǒng)的壓力;

  • 隊(duì)列長度可以做限制,事實(shí)上,秒殺時,后入隊(duì)列的用戶無法秒殺到商品,這些請求可以直接被拋棄,返回活動已結(jié)束或商品已售完信息;

4.消息驅(qū)動的系統(tǒng)

具體場景:用戶新上傳了一批照片,人臉識別系統(tǒng)需要對這個用戶的所有照片進(jìn)行聚類,聚類完成后由對賬系統(tǒng)重新生成用戶的人臉?biāo)饕?加快查詢)。這三個子系統(tǒng)間由消息隊(duì)列連接起來,前一個階段的處理結(jié)果放入隊(duì)列中,后一個階段從隊(duì)列中獲取消息繼續(xù)處理。

該方法有如下優(yōu)點(diǎn):

  • 避免了直接調(diào)用下一個系統(tǒng)導(dǎo)致當(dāng)前系統(tǒng)失?。?/p>

  • 每個子系統(tǒng)對于消息的處理方式可以更為靈活,可以選擇收到消息時就處理,可以選擇定時處理,也可以劃分時間段按不同處理速度處理;

3. 消息隊(duì)列的兩種模式

消息隊(duì)列包括兩種模式,點(diǎn)對點(diǎn)模式(point to point, queue)和發(fā)布/訂閱模式(publish/subscribe,topic)

1) 點(diǎn)對點(diǎn)模式

點(diǎn)對點(diǎn)模式下包括三個角色:

  • 消息隊(duì)列
  • 發(fā)送者 (生產(chǎn)者)
  • 接收者(消費(fèi)者)
    消息發(fā)送者生產(chǎn)消息發(fā)送到queue中,然后消息接收者從queue中取出并且消費(fèi)消息。消息被消費(fèi)以后,queue中不再有存儲,所以消息接收者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。

點(diǎn)對點(diǎn)模式特點(diǎn):

  • 每個消息只有一個接收者(Consumer)(即一旦被消費(fèi),消息就不再在消息隊(duì)列中);
  • 發(fā)送者和接發(fā)收者間沒有依賴性,發(fā)送者發(fā)送消息之后,不管有沒有接收者在運(yùn)行,都不會影響到發(fā)送者下次發(fā)送消息;
  • 接收者在成功接收消息之后需向隊(duì)列應(yīng)答成功,以便消息隊(duì)列刪除當(dāng)前接收的消息;

2) 發(fā)布/訂閱模式

發(fā)布/訂閱模式下包括三個角色:

  • 角色主題(Topic)
  • 發(fā)布者(Publisher)
  • 訂閱者(Subscriber)
    發(fā)布者將消息發(fā)送到Topic,系統(tǒng)將這些消息傳遞給多個訂閱者。

發(fā)布/訂閱模式特點(diǎn):

  • 每個消息可以有多個訂閱者;
  • 發(fā)布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創(chuàng)建一個訂閱者之后,才能消費(fèi)發(fā)布者的消息。
  • 為了消費(fèi)消息,訂閱者需要提前訂閱該角色主題,并保持在線運(yùn)行;

4. 常用的消息隊(duì)列介紹

1) RabbitMQ

RabbitMQ 2007年發(fā)布,是一個在AMQP(高級消息隊(duì)列協(xié)議)基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng),是當(dāng)前最主流的消息中間件之一。

2) ActiveMQ

ActiveMQ是由Apache出品,ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。它非??焖伲С侄喾N語言的客戶端和協(xié)議,而且可以非常容易的嵌入到企業(yè)的應(yīng)用環(huán)境中,并有許多高級功能。

3) RocketMQ

RocketMQ出自 阿里公司的開源產(chǎn)品,用 Java 語言實(shí)現(xiàn),在設(shè)計(jì)時參考了 Kafka,并做出了自己的一些改進(jìn),消息可靠性上比 Kafka 更好。RocketMQ在阿里集團(tuán)被廣泛應(yīng)用在訂單,交易,充值,流計(jì)算,消息推送,日志流式處理等。

4) Kafka

Apache Kafka是一個分布式消息發(fā)布訂閱系統(tǒng)。它最初由LinkedIn公司基于獨(dú)特的設(shè)計(jì)實(shí)現(xiàn)為一個分布式的提交日志系統(tǒng)( a distributed commit log),之后成為Apache項(xiàng)目的一部分。Kafka系統(tǒng)快速、可擴(kuò)展并且可持久化。它的分區(qū)特性,可復(fù)制和可容錯都是其不錯的特性。

5. Pulsar

Apahce Pulasr是一個企業(yè)級的發(fā)布-訂閱消息系統(tǒng),最初是由雅虎開發(fā),是下一代云原生分布式消息流平臺,集消息、存儲、輕量化函數(shù)式計(jì)算為一體,采用計(jì)算與存儲分離架構(gòu)設(shè)計(jì),支持多租戶、持久化存儲、多機(jī)房跨區(qū)域數(shù)據(jù)復(fù)制,具有強(qiáng)一致性、高吞吐、低延時及高可擴(kuò)展性等流數(shù)據(jù)存儲特性。

Pulsar 非常靈活:它既可以應(yīng)用于像 Kafka 這樣的分布式日志應(yīng)用場景,也可以應(yīng)用于像 RabbitMQ 這樣的純消息傳遞系統(tǒng)場景。它支持多種類型的訂閱、多種交付保證、保留策略以及處理模式演變的方法,以及其他諸多特性。

1. Pulsar 的特性

  • 內(nèi)置多租戶:不同的團(tuán)隊(duì)可以使用相同的集群并將其隔離,解決了許多管理難題。它支持隔離、身份驗(yàn)證、授權(quán)和配額;

  • 多層體系結(jié)構(gòu):Pulsar 將所有 topic 數(shù)據(jù)存儲在由 Apache BookKeeper 支持的專業(yè)數(shù)據(jù)層中。存儲和消息傳遞的分離解決了擴(kuò)展、重新平衡和維護(hù)集群的許多問題。它還提高了可靠性,幾乎不可能丟失數(shù)據(jù)。另外,在讀取數(shù)據(jù)時可以直連 BookKeeper,且不影響實(shí)時攝取。例如,可以使用 Presto 對 topic 執(zhí)行 SQL 查詢,類似于 KSQL,但不會影響實(shí)時數(shù)據(jù)處理;

  • 虛擬 topic:由于采用 n 層體系結(jié)構(gòu),因此對 topic 的數(shù)量沒有限制,topic 及其存儲是分離的。用戶還可以創(chuàng)建非持久性 topic;

  • N 層存儲:Kafka 的一個問題是,存儲費(fèi)用可能變高。因此,它很少用于存儲'冷'數(shù)據(jù),并且消息經(jīng)常被刪除,Apache Pulsar 可以借助分層存儲自動將舊數(shù)據(jù)卸載到 Amazon S3 或其他數(shù)據(jù)存儲系統(tǒng),并且仍然向客戶端展示透明視圖;Pulsar 客戶端可以從時間開始節(jié)點(diǎn)讀取,就像所有消息都存在于日志中一樣;

2. Pulsar 存儲架構(gòu)

Pulsar 的多層架構(gòu)影響了存儲數(shù)據(jù)的方式。Pulsar 將 topic 分區(qū)劃分為分片(segment),然后將這些分片存儲在 Apache BookKeeper 的存儲節(jié)點(diǎn)上,以提高性能、可伸縮性和可用性。

Pulsar 的無限分布式日志以分片為中心,借助擴(kuò)展日志存儲(通過 Apache BookKeeper)實(shí)現(xiàn),內(nèi)置分層存儲支持,因此分片可以均勻地分布在存儲節(jié)點(diǎn)上。

由于與任一給定 topic 相關(guān)的數(shù)據(jù)都不會與特定存儲節(jié)點(diǎn)進(jìn)行捆綁,因此很容易替換存儲節(jié)點(diǎn)或縮擴(kuò)容。另外,集群中最小或最慢的節(jié)點(diǎn)也不會成為存儲或帶寬的短板。

Pulsar 架構(gòu)能實(shí)現(xiàn)分區(qū)管理,負(fù)載均衡,因此使用 Pulsar 能夠快速擴(kuò)展并達(dá)到高可用。這兩點(diǎn)至關(guān)重要,所以 Pulsar 非常適合用來構(gòu)建關(guān)鍵任務(wù)服務(wù),如金融應(yīng)用場景的計(jì)費(fèi)平臺,電子商務(wù)和零售商的交易處理系統(tǒng),金融機(jī)構(gòu)的實(shí)時風(fēng)險(xiǎn)控制系統(tǒng)等。

通過性能強(qiáng)大的 Netty 架構(gòu),數(shù)據(jù)從 producers 到 broker,再到 bookie 的轉(zhuǎn)移都是零拷貝,不會生成副本。這一特性對所有流應(yīng)用場景都非常友好,因?yàn)閿?shù)據(jù)直接通過網(wǎng)絡(luò)或磁盤進(jìn)行傳輸,沒有任何性能損失。

3. Pulsar 消息消費(fèi)

Pulsar 的消費(fèi)模型采用了流拉取的方式。流拉取是長輪詢的改進(jìn)版,不僅實(shí)現(xiàn)了單個調(diào)用和請求之間的零等待,還可以提供雙向消息流。通過流拉取模型,Pulsar 實(shí)現(xiàn)了端到端的低延遲,這種低延遲比所有現(xiàn)有的長輪詢消息系統(tǒng)(如 Kafka)都低。

6. Kafka與Pulsar對比

1. Pulsar 的主要優(yōu)勢:

  • 更多功能:Pulsar Function、多租戶、Schema registry、n 層存儲、多種消費(fèi)模式和持久性模式等;

  • 更大的靈活性:3 種訂閱類型(獨(dú)占,共享和故障轉(zhuǎn)移),用戶可以在一個訂閱上管理多個 topic;

  • 易于操作運(yùn)維:架構(gòu)解耦和 n 層存儲;

  • 與 Presto 的 SQL 集成,可直接查詢存儲而不會影響 broker;

  • 借助 n 層自動存儲選項(xiàng),可以更低成本地存儲;

2. Pulsar 的劣勢

Pulsar 并不完美,Pulsar 也存在一些問題:

  • 相對缺乏支持、文檔和案例;

  • n 層體系結(jié)構(gòu)導(dǎo)致需要更多組件:BookKeeper;

  • 插件和客戶端相對 Kafka 較少;

  • 云中的支持較少,Confluent 具有托管云產(chǎn)品。

3. 什么時候應(yīng)該考慮 Pulsar

  • 同時需要像 RabbitMQ 這樣的隊(duì)列和 Kafka 這樣的流處理程序;

  • 需要易用的地理復(fù)制;

  • 實(shí)現(xiàn)多租戶,并確保每個團(tuán)隊(duì)的訪問權(quán)限;

  • 需要長時間保留消息,并且不想將其卸載到另一個存儲中;

  • 需要高性能,基準(zhǔn)測試表明 Pulsar 提供了更低的延遲和更高的吞吐量;

總之,Pulsar還比較新,社區(qū)不完善,用的企業(yè)比較少,網(wǎng)上有價值的討論和問題的解決比較少,遠(yuǎn)沒有Kafka生態(tài)系統(tǒng)龐大,且用戶量非常龐大,目前Kafka依舊是大數(shù)據(jù)領(lǐng)域消息隊(duì)列的王者!所以我們還是以Kafka為主!

7. 其他消息隊(duì)列與Kafka對比

二、Kafka基礎(chǔ)

1. kafka的基本介紹

官網(wǎng):http://kafka.apache.org/

kafka是最初由linkedin公司開發(fā)的,使用scala語言編寫,kafka是一個分布式,分區(qū)的,多副本的,多訂閱者的日志系統(tǒng)(分布式MQ系統(tǒng)),可以用于搜索日志,監(jiān)控日志,訪問日志等。

Kafka is a distributed, partitioned, replicated commit logservice。它提供了類似于JMS的特性,但是在設(shè)計(jì)實(shí)現(xiàn)上完全不同,此外它并不是JMS規(guī)范的實(shí)現(xiàn)。

kafka對消息保存時根據(jù)Topic進(jìn)行歸類,發(fā)送消息者成為Producer, 消息接受者成為Consumer, 此外kafka集群有多個kafka實(shí)例組成,每個實(shí)例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴于zookeeper來保證系統(tǒng)可用性集群保存一些meta信息。

2. kafka的好處

  • 可靠性:分布式的,分區(qū),復(fù)本和容錯的。
  • 可擴(kuò)展性:kafka消息傳遞系統(tǒng)輕松縮放,無需停機(jī)。
  • 耐用性:kafka使用分布式提交日志,這意味著消息會盡可能快速的保存在磁盤上,因此它是持久的。
  • 性能:kafka對于發(fā)布和定于消息都具有高吞吐量。即使存儲了許多TB的消息,他也爆出穩(wěn)定的性能。
  • kafka非???/strong>:保證零停機(jī)和零數(shù)據(jù)丟失。

3. 分布式的發(fā)布與訂閱系統(tǒng)

apache kafka是一個分布式發(fā)布-訂閱消息系統(tǒng)和一個強(qiáng)大的隊(duì)列,可以處理大量的數(shù)據(jù),并使能夠?qū)⑾囊粋€端點(diǎn)傳遞到另一個端點(diǎn),kafka適合離線和在線消息消費(fèi)。

kafka消息保留在磁盤上,并在集群內(nèi)復(fù)制以防止數(shù)據(jù)丟失。kafka構(gòu)建在zookeeper同步服務(wù)之上。它與apache和spark非常好的集成,應(yīng)用于實(shí)時流式數(shù)據(jù)分析。

4. kafka的主要應(yīng)用場景

1. 指標(biāo)分析

kafka 通常用于操作監(jiān)控?cái)?shù)據(jù)。這設(shè)計(jì)聚合來自分布式應(yīng)用程序的統(tǒng)計(jì)信息,以產(chǎn)生操作的數(shù)據(jù)集中反饋

2. 日志聚合解決方法

kafka可用于跨組織從多個服務(wù)器收集日志,并使他們以標(biāo)準(zhǔn)的格式提供給多個服務(wù)器。

3. 流式處理

流式處理框架(spark,storm,?ink)重主題中讀取數(shù)據(jù),對齊進(jìn)行處理,并將處理后的數(shù)據(jù)寫入新的主題,供 用戶和應(yīng)用程序使用,kafka的強(qiáng)耐久性在流處理的上下文中也非常的有用。

三、Kafka架構(gòu)及組件

1. kafka架構(gòu)

  1. 生產(chǎn)者API

允許應(yīng)用程序發(fā)布記錄流至一個或者多個kafka的主題(topics)。

  1. 消費(fèi)者API

允許應(yīng)用程序訂閱一個或者多個主題,并處理這些主題接收到的記錄流。

3. StreamsAPI

允許應(yīng)用程序充當(dāng)流處理器(stream processor),從一個或者多個主題獲取輸入流,并生產(chǎn)一個輸出流到一個或 者多個主題,能夠有效的變化輸入流為輸出流。

  1. ConnectAPI

允許構(gòu)建和運(yùn)行可重用的生產(chǎn)者或者消費(fèi)者,能夠把kafka主題連接到現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)。例如:一個連接到關(guān)系數(shù)據(jù)庫的連接器可能會獲取每個表的變化。

Kafka 架構(gòu)

注:在Kafka 2.8.0 版本,移除了對Zookeeper的依賴,通過KRaft進(jìn)行自己的集群管理,使用Kafka內(nèi)部的Quorum控制器來取代ZooKeeper,因此用戶第一次可在完全不需要ZooKeeper的情況下執(zhí)行Kafka,這不只節(jié)省運(yùn)算資源,并且也使得Kafka效能更好,還可支持規(guī)模更大的集群。

過去Apache ZooKeeper是Kafka這類分布式系統(tǒng)的關(guān)鍵,ZooKeeper扮演協(xié)調(diào)代理的角色,所有代理服務(wù)器啟動時,都會連接到Zookeeper進(jìn)行注冊,當(dāng)代理狀態(tài)發(fā)生變化時,Zookeeper也會儲存這些數(shù)據(jù),在過去,ZooKeeper是一個強(qiáng)大的工具,但是畢竟ZooKeeper是一個獨(dú)立的軟件,使得Kafka整個系統(tǒng)變得復(fù)雜,因此官方?jīng)Q定使用內(nèi)部Quorum控制器來取代ZooKeeper。

這項(xiàng)工作從去年4月開始,而現(xiàn)在這項(xiàng)工作取得部分成果,用戶將可以在2.8版本,在沒有ZooKeeper的情況下執(zhí)行Kafka,官方稱這項(xiàng)功能為Kafka Raft元數(shù)據(jù)模式(KRaft)。

在KRaft模式,過去由Kafka控制器和ZooKeeper所操作的元數(shù)據(jù),將合并到這個新的Quorum控制器,并且在Kafka集群內(nèi)部執(zhí)行,當(dāng)然,如果使用者有特殊使用情境,Quorum控制器也可以在專用的硬件上執(zhí)行。

說完在新版本中移除zookeeper這個事,接著聊kafka的其他功能:

kafka支持消息持久化,消費(fèi)端是主動拉取數(shù)據(jù),消費(fèi)狀態(tài)和訂閱關(guān)系由客戶端負(fù)責(zé)維護(hù),消息消費(fèi)完后,不會立即刪除,會保留歷史消息。因此支持多訂閱時,消息只會存儲一份就可以。

  1. broker:kafka集群中包含一個或者多個服務(wù)實(shí)例(節(jié)點(diǎn)),這種服務(wù)實(shí)例被稱為broker(一個broker就是一個節(jié)點(diǎn)/一個服務(wù)器);
  2. topic:每條發(fā)布到kafka集群的消息都屬于某個類別,這個類別就叫做topic;
  3. partition:partition是一個物理上的概念,每個topic包含一個或者多個partition;
  4. segment:一個partition當(dāng)中存在多個segment文件段,每個segment分為兩部分,.log文件和 .index 文件,其中 .index 文件是索引文件,主要用于快速查詢, .log 文件當(dāng)中數(shù)據(jù)的偏移量位置;
  5. producer:消息的生產(chǎn)者,負(fù)責(zé)發(fā)布消息到 kafka 的 broker 中;
  6. consumer:消息的消費(fèi)者,向 kafka 的 broker 中讀取消息的客戶端;
  7. consumer group:消費(fèi)者組,每一個 consumer 屬于一個特定的 consumer group(可以為每個consumer指定 groupName);
  8. .log:存放數(shù)據(jù)文件;
  9. .index:存放.log文件的索引數(shù)據(jù)。

2. Kafka 主要組件

1. producer(生產(chǎn)者)

producer主要是用于生產(chǎn)消息,是kafka當(dāng)中的消息生產(chǎn)者,生產(chǎn)的消息通過topic進(jìn)行歸類,保存到kafka的broker里面去。

2. topic(主題)

  1. kafka將消息以topic為單位進(jìn)行歸類;
  2. topic特指kafka處理的消息源(feeds of messages)的不同分類;
  3. topic是一種分類或者發(fā)布的一些列記錄的名義上的名字。kafka主題始終是支持多用戶訂閱的;也就是說,一 個主題可以有零個,一個或者多個消費(fèi)者訂閱寫入的數(shù)據(jù);
  4. 在kafka集群中,可以有無數(shù)的主題;
  5. 生產(chǎn)者和消費(fèi)者消費(fèi)數(shù)據(jù)一般以主題為單位。更細(xì)粒度可以到分區(qū)級別。

3. partition(分區(qū))

kafka當(dāng)中,topic是消息的歸類,一個topic可以有多個分區(qū)(partition),每個分區(qū)保存部分topic的數(shù)據(jù),所有的partition當(dāng)中的數(shù)據(jù)全部合并起來,就是一個topic當(dāng)中的所有的數(shù)據(jù)。

一個broker服務(wù)下,可以創(chuàng)建多個分區(qū),broker數(shù)與分區(qū)數(shù)沒有關(guān)系;
在kafka中,每一個分區(qū)會有一個編號:編號從0開始。
每一個分區(qū)內(nèi)的數(shù)據(jù)是有序的,但全局的數(shù)據(jù)不能保證是有序的。(有序是指生產(chǎn)什么樣順序,消費(fèi)時也是什么樣的順序)

4. consumer(消費(fèi)者)

consumer是kafka當(dāng)中的消費(fèi)者,主要用于消費(fèi)kafka當(dāng)中的數(shù)據(jù),消費(fèi)者一定是歸屬于某個消費(fèi)組中的。

5. consumer group(消費(fèi)者組)

消費(fèi)者組由一個或者多個消費(fèi)者組成,同一個組中的消費(fèi)者對于同一條消息只消費(fèi)一次。

每個消費(fèi)者都屬于某個消費(fèi)者組,如果不指定,那么所有的消費(fèi)者都屬于默認(rèn)的組。

每個消費(fèi)者組都有一個ID,即group ID。組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來消費(fèi)一個訂閱主題( topic)的所有分區(qū)(partition)。

當(dāng)然,每個分區(qū)只能由同一個消費(fèi)組內(nèi)的一個消費(fèi)者(consumer)來消費(fèi),可以由不同的消費(fèi)組來消費(fèi)。

partition數(shù)量決定了每個consumer group中并發(fā)消費(fèi)者的最大數(shù)量。如下圖:

示例 1

如上面左圖所示,如果只有兩個分區(qū),即使一個組內(nèi)的消費(fèi)者有4個,也會有兩個空閑的。
如上面右圖所示,有4個分區(qū),每個消費(fèi)者消費(fèi)一個分區(qū),并發(fā)量達(dá)到最大4。

在來看如下一幅圖:

示例 2

如上圖所示,不同的消費(fèi)者組消費(fèi)同一個topic,這個topic有4個分區(qū),分布在兩個節(jié)點(diǎn)上。左邊的消費(fèi)組1有兩個消費(fèi)者,每個消費(fèi)者就要消費(fèi)兩個分區(qū)才能把消息完整的消費(fèi)完,右邊的消費(fèi)組2有四個消費(fèi)者,每個消費(fèi)者消費(fèi)一個分區(qū)即可。

總結(jié)下kafka中分區(qū)與消費(fèi)組的關(guān)系

消費(fèi)組:由一個或者多個消費(fèi)者組成,同一個組中的消費(fèi)者對于同一條消息只消費(fèi)一次。某一個主題下的分區(qū)數(shù),對于消費(fèi)該主題的同一個消費(fèi)組下的消費(fèi)者數(shù)量,應(yīng)該小于等于該主題下的分區(qū)數(shù)。

如:某一個主題有4個分區(qū),那么消費(fèi)組中的消費(fèi)者應(yīng)該小于等于4,而且最好與分區(qū)數(shù)成整數(shù)倍 124 這樣。同一個分區(qū)下的數(shù)據(jù),在同一時刻,不能同一個消費(fèi)組的不同消費(fèi)者消費(fèi)。

總結(jié):分區(qū)數(shù)越多,同一時間可以有越多的消費(fèi)者來進(jìn)行消費(fèi),消費(fèi)數(shù)據(jù)的速度就會越快,提高消費(fèi)的性能。

6. partition replicas(分區(qū)副本)

kafka 中的分區(qū)副本如下圖所示:

kafka 分區(qū)副本

副本數(shù)(replication-factor):控制消息保存在幾個broker(服務(wù)器)上,一般情況下副本數(shù)等于broker的個數(shù)。

一個broker服務(wù)下,不可以創(chuàng)建多個副本因子。創(chuàng)建主題時,副本因子應(yīng)該小于等于可用的broker數(shù)。

副本因子操作以分區(qū)為單位的。每個分區(qū)都有各自的主副本和從副本;

主副本叫做leader,從副本叫做 follower(在有多個副本的情況下,kafka會為同一個分區(qū)下的所有分區(qū),設(shè)定角色關(guān)系:一個leader和N個 follower),處于同步狀態(tài)的副本叫做in-sync-replicas(ISR);

follower通過拉的方式從leader同步數(shù)據(jù)。消費(fèi)者和生產(chǎn)者都是從leader讀寫數(shù)據(jù),不與follower交互。

副本因子的作用:讓kafka讀取數(shù)據(jù)和寫入數(shù)據(jù)時的可靠性。

副本因子是包含本身,同一個副本因子不能放在同一個broker中。

如果某一個分區(qū)有三個副本因子,就算其中一個掛掉,那么只會剩下的兩個中,選擇一個leader,但不會在其他的broker中,另啟動一個副本(因?yàn)樵诹硪慌_啟動的話,存在數(shù)據(jù)傳遞,只要在機(jī)器之間有數(shù)據(jù)傳遞,就會長時間占用網(wǎng)絡(luò)IO,kafka是一個高吞吐量的消息系統(tǒng),這個情況不允許發(fā)生)所以不會在另一個broker中啟動。

如果所有的副本都掛了,生產(chǎn)者如果生產(chǎn)數(shù)據(jù)到指定分區(qū)的話,將寫入不成功。

lsr表示:當(dāng)前可用的副本。

7. segment文件

一個partition當(dāng)中由多個segment文件組成,每個segment文件,包含兩部分,一個是 .log 文件,另外一個是 .index 文件,其中 .log 文件包含了我們發(fā)送的數(shù)據(jù)存儲,.index 文件,記錄的是我們.log文件的數(shù)據(jù)索引值,以便于我們加快數(shù)據(jù)的查詢速度。

索引文件與數(shù)據(jù)文件的關(guān)系

既然它們是一一對應(yīng)成對出現(xiàn),必然有關(guān)系。索引文件中元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址。

比如索引文件中 3,497 代表:數(shù)據(jù)文件中的第三個message,它的偏移地址為497。

再來看數(shù)據(jù)文件中,Message 368772表示:在全局partiton中是第368772個message。

注:segment index file 采取稀疏索引存儲方式,減少索引文件大小,通過mmap(內(nèi)存映射)可以直接內(nèi)存操作,稀疏索引為數(shù)據(jù)文件的每個對應(yīng)message設(shè)置一個元數(shù)據(jù)指針,它比稠密索引節(jié)省了更多的存儲空間,但查找起來需要消耗更多的時間。

.index 與 .log 對應(yīng)關(guān)系如下:

.index 與 .log

上圖左半部分是索引文件,里面存儲的是一對一對的key-value,其中key是消息在數(shù)據(jù)文件(對應(yīng)的log文件)中的編號,比如“1,3,6,8……”,分別表示在log文件中的第1條消息、第3條消息、第6條消息、第8條消息……

那么為什么在index文件中這些編號不是連續(xù)的呢?這是因?yàn)閕ndex文件中并沒有為數(shù)據(jù)文件中的每條消息都建立索引,而是采用了稀疏存儲的方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引。

這樣避免了索引文件占用過多的空間,從而可以將索引文件保留在內(nèi)存中。但缺點(diǎn)是沒有建立索引的Message也不能一次定位到其在數(shù)據(jù)文件的位置,從而需要做一次順序掃描,但是這次順序掃描的范圍就很小了。

value代表的是在全局partiton中的第幾個消息。

以索引文件中元數(shù)據(jù) 3,497 為例,其中3代表在右邊log數(shù)據(jù)文件中從上到下第3個消息,497表示該消息的物理偏移地址(位置)為497(也表示在全局partiton表示第497個消息-順序?qū)懭胩匦?。

log日志目錄及組成kafka在我們指定的log.dir目錄下,會創(chuàng)建一些文件夾;名字是 (主題名字-分區(qū)名) 所組成的文件夾。在(主題名字-分區(qū)名)的目錄下,會有兩個文件存在,如下所示:

#索引文件
00000000000000000000.index
#日志內(nèi)容
00000000000000000000.log

在目錄下的文件,會根據(jù)log日志的大小進(jìn)行切分,.log文件的大小為1G的時候,就會進(jìn)行切分文件;如下:

-rw-r--r--. 1 root root 389k  1月  17  18:03   00000000000000000000.index
-rw-r--r--. 1 root root 1.0G  1月  17  18:03   00000000000000000000.log
-rw-r--r--. 1 root root  10M  1月  17  18:03   00000000000000077894.index
-rw-r--r--. 1 root root 127M  1月  17  18:03   00000000000000077894.log

在kafka的設(shè)計(jì)中,將offset值作為了文件名的一部分。

segment文件命名規(guī)則:partion全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個全局partion的最大offset(偏移message數(shù))。數(shù)值最大為64位long大小,20位數(shù)字字符長度,沒有數(shù)字就用0填充。

通過索引信息可以快速定位到message。通過index元數(shù)據(jù)全部映射到內(nèi)存,可以避免segment File的IO磁盤操作;

通過索引文件稀疏存儲,可以大幅降低index文件元數(shù)據(jù)占用空間大小。

稀疏索引:為了數(shù)據(jù)創(chuàng)建索引,但范圍并不是為每一條創(chuàng)建,而是為某一個區(qū)間創(chuàng)建;好處:就是可以減少索引值的數(shù)量。不好的地方:找到索引區(qū)間之后,要得進(jìn)行第二次處理。

8. message的物理結(jié)構(gòu)

生產(chǎn)者發(fā)送到kafka的每條消息,都被kafka包裝成了一個message

message的物理結(jié)構(gòu)如下圖所示:

.index 與 .log

所以生產(chǎn)者發(fā)送給kafka的消息并不是直接存儲起來,而是經(jīng)過kafka的包裝,每條消息都是上圖這個結(jié)構(gòu),只有最后一個字段才是真正生產(chǎn)者發(fā)送的消息數(shù)據(jù)。

四、Kafka集群操作

1. 創(chuàng)建topic

創(chuàng)建一個名字為test的主題, 有三個分區(qū),有兩個副本:

bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 3 --topic test

2. 查看主題命令

查看kafka當(dāng)中存在的主題:

bin/kafka-topics.sh  --list --zookeeper node01:2181,node02:2181,node03:2181

3. 生產(chǎn)者生產(chǎn)數(shù)據(jù)

模擬生產(chǎn)者來生產(chǎn)數(shù)據(jù):

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test

4. 消費(fèi)者消費(fèi)數(shù)據(jù)

執(zhí)行以下命令來模擬消費(fèi)者進(jìn)行消費(fèi)數(shù)據(jù):

bin/kafka-console-consumer.sh --from-beginning --topic test  --zookeeper node01:2181,node02:2181,node03:2181

5. 運(yùn)行describe  topics命令

執(zhí)行以下命令運(yùn)行describe查看topic的相關(guān)信息:

bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test

結(jié)果說明:

這是輸出的解釋。第一行給出了所有分區(qū)的摘要,每個附加行提供有關(guān)一個分區(qū)的信息。由于我們只有一個分 區(qū)用于此主題,因此只有一行。

“l(fā)eader”是負(fù)責(zé)給定分區(qū)的所有讀取和寫入的節(jié)點(diǎn)。每個節(jié)點(diǎn)將成為隨機(jī)選擇的分區(qū)部分的領(lǐng)導(dǎo)者。(因?yàn)樵趉afka中 如果有多個副本的話,就會存在leader和follower的關(guān)系,表示當(dāng)前這個副本為leader所在的broker是哪一個)

“replicas”是復(fù)制此分區(qū)日志的節(jié)點(diǎn)列表,無論它們是否為領(lǐng)導(dǎo)者,或者即使它們當(dāng)前處于活動狀態(tài)。(所有副本列表0,1,2)

“isr”是“同步”復(fù)制品的集合。這是副本列表的子集,該列表當(dāng)前處于活躍狀態(tài)并且已經(jīng)被領(lǐng)導(dǎo)者捕獲。(可用的列表數(shù))

6. 增加topic分區(qū)數(shù)

執(zhí)行以下命令可以增加topic分區(qū)數(shù):

bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8

7. 增加配置

動態(tài)修改kakfa的配置:

bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1

8. 刪除配置

動態(tài)刪除kafka集群配置:

bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages

9.  刪除topic

目前刪除topic在默認(rèn)情況下知識打上一個刪除的標(biāo)記,在重新啟動kafka后才刪除。

如果需要立即刪除,則需要在server.properties中配置:

delete.topic.enable=true

然后執(zhí)行以下命令進(jìn)行刪除topic:

kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName

五、Kafka的JavaAPI操作

1. 生產(chǎn)者代碼

使用生產(chǎn)者,生產(chǎn)數(shù)據(jù)

/**
* 訂單的生產(chǎn)者代碼,
*/
public class OrderProducer {
public static void main(String[] args) throws InterruptedException {
/* 1、連接集群,通過配置文件的方式
* 2、發(fā)送數(shù)據(jù)-topic:order,value
*/
Properties props = new Properties(); props.put('bootstrap.servers', 'node01:9092'); props.put('acks', 'all');
props.put('retries', 0);
props.put('batch.size', 16384);
props.put('linger.ms', 1);
props.put('buffer.memory', 33554432); 
props.put('key.serializer',
'org.apache.kafka.common.serialization.StringSerializer'); 
props.put('value.serializer',
'org.apache.kafka.common.serialization.StringSerializer');
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>
(props);
for (int i = 0; i < 1000; i++) {
// 發(fā)送數(shù)據(jù) ,需要一個producerRecord對象,最少參數(shù) String topic, V value kafkaProducer.send(new ProducerRecord<String, String>('order', '訂單信
息!'+i));
Thread.sleep(100);
}
}
}

kafka當(dāng)中的數(shù)據(jù)分區(qū):

kafka生產(chǎn)者發(fā)送的消息,都是保存在broker當(dāng)中,我們可以自定義分區(qū)規(guī)則,決定消息發(fā)送到哪個partition里面去進(jìn)行保存查看ProducerRecord這個類的源碼,就可以看到kafka的各種不同分區(qū)策略

kafka當(dāng)中支持以下四種數(shù)據(jù)的分區(qū)方式:

//第一種分區(qū)策略,如果既沒有指定分區(qū)號,也沒有指定數(shù)據(jù)key,那么就會使用輪詢的方式將數(shù)據(jù)均勻的發(fā)送到不同的分區(qū)里面去
//ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>('mypartition', 'mymessage' + i);
//kafkaProducer.send(producerRecord1);
//第二種分區(qū)策略 如果沒有指定分區(qū)號,指定了數(shù)據(jù)key,通過key.hashCode  % numPartitions來計(jì)算數(shù)據(jù)究竟會保存在哪一個分區(qū)里面
//注意:如果數(shù)據(jù)key,沒有變化   key.hashCode % numPartitions  =  固定值  所有的數(shù)據(jù)都會寫入到某一個分區(qū)里面去
//ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>('mypartition', 'mykey', 'mymessage' + i);
//kafkaProducer.send(producerRecord2);
//第三種分區(qū)策略:如果指定了分區(qū)號,那么就會將數(shù)據(jù)直接寫入到對應(yīng)的分區(qū)里面去
//  ProducerRecord<String, String> producerRecord3 = new ProducerRecord<>('mypartition', 0, 'mykey', 'mymessage' + i);
// kafkaProducer.send(producerRecord3);
//第四種分區(qū)策略:自定義分區(qū)策略。如果不自定義分區(qū)規(guī)則,那么會將數(shù)據(jù)使用輪詢的方式均勻的發(fā)送到各個分區(qū)里面去
kafkaProducer.send(new ProducerRecord<String, String>('mypartition','mymessage'+i));

自定義分區(qū)策略:

public class KafkaCustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}

@Override
public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int partitionNum = partitions.size();
Random random = new Random();
int partition = random.nextInt(partitionNum);
return partition;
}

@Override
public void close() {

}

}

主代碼中添加配置:

@Test
public void kafkaProducer() throws Exception {
//1、準(zhǔn)備配置文件
Properties props = new Properties();
props.put('bootstrap.servers', 'node01:9092,node02:9092,node03:9092');
props.put('acks', 'all');
props.put('retries', 0);
props.put('batch.size', 16384);
props.put('linger.ms', 1);
props.put('buffer.memory', 33554432);
props.put('partitioner.class', 'cn.itcast.kafka.partitioner.KafkaCustomPartitioner');
props.put('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
props.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
//2、創(chuàng)建KafkaProducer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i=0;i<100;i++){
//3、發(fā)送數(shù)據(jù)
kafkaProducer.send(new ProducerRecord<String, String>('testpart','0','value'+i));
}

kafkaProducer.close();
}

2. 消費(fèi)者代碼

消費(fèi)必要條件:

消費(fèi)者要從kafka  Cluster進(jìn)行消費(fèi)數(shù)據(jù),必要條件有以下四個:

  1. 地址:bootstrap.servers=node01:9092

  2. 序列化:key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer

  3. 主題(topic):需要制定具體的某個topic(order)即可。

  4. 消費(fèi)者組:group.id=test

1) 自動提交offset

消費(fèi)完成之后,自動提交offset:

/**
* 消費(fèi)訂單數(shù)據(jù)--- javaben.tojson
*/
public class OrderConsumer {
public static void main(String[] args) {
// 1\連接集群
Properties props = new Properties(); props.put('bootstrap.servers', 'hadoop-01:9092'); props.put('group.id', 'test');

//以下兩行代碼 ---消費(fèi)者自動提交offset值 
props.put('enable.auto.commit', 'true'); 
props.put('auto.commit.interval.ms',  '1000');
props.put('key.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');
props.put('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>
(props);
//   2、發(fā)送數(shù)據(jù) 發(fā)送數(shù)據(jù)需要,訂閱下要消費(fèi)的topic。order kafkaConsumer.subscribe(Arrays.asList('order')); 
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);// jdk queue offer插入、poll獲取元素。blockingqueue put插入原生, take獲取元素
for (ConsumerRecord<String, String> record : consumerRecords) { System.out.println('消費(fèi)的數(shù)據(jù)為:' + record.value());
}
}
}
}

2) 手動提交offset

如果Consumer在獲取數(shù)據(jù)后,需要加入處理,數(shù)據(jù)完畢后才確認(rèn)offset,需要程序來控制offset的確認(rèn)。

關(guān)閉自動提交確認(rèn)選項(xiàng):props.put('enable.auto.commit', 'false');

手動提交offset值:kafkaConsumer.commitSync();

完整代碼如下:

Properties props = new Properties(); 
props.put('bootstrap.servers', 'localhost:9092'); 
props.put('group.id', 'test');
//關(guān)閉自動提交確認(rèn)選項(xiàng)
props.put('enable.auto.commit', 'false'); 
props.put('key.deserializer',
'org.apache.kafka.common.serialization.StringDeserializer'); 
props.put('value.deserializer',
'org.apache.kafka.common.serialization.StringDeserializer'); 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList('test'));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) { 
insertIntoDb(buffer);
// 手動提交offset值
consumer.commitSync(); 
buffer.clear();
}
}

3) 消費(fèi)完每個分區(qū)之后手動提交offset

上面的示例使用commitSync將所有已接收的記錄標(biāo)記為已提交。在某些情況下,可能希望通過明確指定偏移量來更好地控制已提交的記錄。在下面的示例中,我們在完成處理每個分區(qū)中的記錄后提交偏移量:

try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); 
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ': ' + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() -1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally { consumer.close();}

注意事項(xiàng)

提交的偏移量應(yīng)始終是應(yīng)用程序?qū)⒆x取的下一條消息的偏移量。因此,在調(diào)用commitSync(偏移量)時,應(yīng)該在最后處理的消息的偏移量中添加一個。

4) 指定分區(qū)數(shù)據(jù)進(jìn)行消費(fèi)

  1. 如果進(jìn)程正在維護(hù)與該分區(qū)關(guān)聯(lián)的某種本地狀態(tài)(如本地磁盤上的鍵值存儲),那么它應(yīng)該只獲取它在磁盤上維護(hù)的分區(qū)的記錄。

  2. 如果進(jìn)程本身具有高可用性,并且如果失敗則將重新啟動(可能使用YARN,Mesos或AWS工具等集群管理框 架,或作為流處理框架的一部分)。在這種情況下,Kafka不需要檢測故障并重新分配分區(qū),因?yàn)橄倪^程將在另一臺機(jī)器上重新啟動。

Properties props = new Properties(); props.put('bootstrap.servers', 'localhost:9092'); props.put('group.id', 'test'); 
props.put('enable.auto.commit', 'true');
props.put('auto.commit.interval.ms', '1000'); 
props.put('key.deserializer',
'org.apache.kafka.common.serialization.StringDeserializer'); 
props.put('value.deserializer',
'org.apache.kafka.common.serialization.StringDeserializer'); 
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//consumer.subscribe(Arrays.asList('foo',  'bar'));

//手動指定消費(fèi)指定分區(qū)的數(shù)據(jù)---start 
String topic = 'foo';
TopicPartition partition0 = new TopicPartition(topic, 0); 
TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0,  partition1));
//手動指定消費(fèi)指定分區(qū)的數(shù)據(jù)---end
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); 
for (ConsumerRecord<String, String> record : records)
System.out.printf('offset = %d, key = %s, value = %s%n', record.offset(), record.key(), record.value());
}

注意事項(xiàng)

  1. 要使用此模式,只需使用要使用的分區(qū)的完整列表調(diào)用assign(Collection),而不是使用subscribe訂閱主題。

  2. 主題與分區(qū)訂閱只能二選一。

5) 重復(fù)消費(fèi)與數(shù)據(jù)丟失

說明:

  1. 已經(jīng)消費(fèi)的數(shù)據(jù)對于kafka來說,會將消費(fèi)組里面的o?set值進(jìn)行修改,那什么時候進(jìn)行修改了?是在數(shù)據(jù)消費(fèi) 完成之后,比如在控制臺打印完后自動提交;

  2. 提交過程:是通過kafka將o?set進(jìn)行移動到下個message所處的o?set的位置。

  3. 拿到數(shù)據(jù)后,存儲到hbase中或者mysql中,如果hbase或者mysql在這個時候連接不上,就會拋出異常,如果在處理數(shù)據(jù)的時候已經(jīng)進(jìn)行了提交,那么kafka傷的o?set值已經(jīng)進(jìn)行了修改了,但是hbase或者mysql中沒有數(shù)據(jù),這個時候就會出現(xiàn)數(shù)據(jù)丟失。

  4. 什么時候提交o?set值?在Consumer將數(shù)據(jù)處理完成之后,再來進(jìn)行o?set的修改提交。默認(rèn)情況下o?set是 自動提交,需要修改為手動提交o?set值。
  5. 如果在處理代碼中正常處理了,但是在提交o?set請求的時候,沒有連接到kafka或者出現(xiàn)了故障,那么該次修 改o?set的請求是失敗的,那么下次在進(jìn)行讀取同一個分區(qū)中的數(shù)據(jù)時,會從已經(jīng)處理掉的o?set值再進(jìn)行處理一 次,那么在hbase中或者mysql中就會產(chǎn)生兩條一樣的數(shù)據(jù),也就是數(shù)據(jù)重復(fù)。

6) consumer消費(fèi)者消費(fèi)數(shù)據(jù)流程

流程描述

Consumer連接指定的Topic partition所在leader broker,采用pull方式從kafkalogs中獲取消息。對于不同的消費(fèi)模式,會將offset保存在不同的地方官網(wǎng)關(guān)于high level  API  以及l(fā)ow  level  API的簡介:http://kafka.apache.org/0100/documentation.html#impl_consumer

高階API(High Level API)

kafka消費(fèi)者高階API簡單;隱藏Consumer與Broker細(xì)節(jié);相關(guān)信息保存在zookeeper中:

/* create a connection to the cluster */
ConsumerConnector connector = Consumer.create(consumerConfig);

interface ConsumerConnector {

/**
This method is used to get a list of KafkaStreams, which are iterators over
MessageAndMetadata objects from which you can obtain messages and their
associated metadata (currently only topic).
Input: a map of <topic, #streams>
Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
/**
You can also obtain a list of KafkaStreams, that iterate over messages
from topics that match a TopicFilter. (A TopicFilter encapsulates a
whitelist or a blacklist which is a standard Java regex.)
*/
public List<KafkaStream> createMessageStreamsByFilter( TopicFilter topicFilter, int numStreams);
/* Commit the offsets of all messages consumed so far. */ public commitOffsets()
/* Shut down the connector */ public shutdown()
}

說明:大部分的操作都已經(jīng)封裝好了,比如:當(dāng)前消費(fèi)到哪個位置下了,但是不夠靈活(工作過程推薦使用)

低級API(Low Level API):

kafka消費(fèi)者低級API非常靈活;需要自己負(fù)責(zé)維護(hù)連接Controller Broker。保存offset,Consumer Partition對應(yīng)關(guān)系:

class SimpleConsumer {

/* Send fetch request to a broker and get back a set of messages. */ 
public ByteBufferMessageSet fetch(FetchRequest request);

/* Send a list of fetch requests to a broker and get back a response set. */ public MultiFetchResponse multifetch(List<FetchRequest> fetches);

/**

Get a list of valid offsets (up to maxSize) before the given time.
The result is a list of offsets, in descending order.
@param time: time in millisecs,
if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest
offset available. if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest

available. public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);

* offset
*/

說明:沒有進(jìn)行包裝,所有的操作有用戶決定,如自己的保存某一個分區(qū)下的記錄,你當(dāng)前消費(fèi)到哪個位置。

3. kafka Streams API開發(fā)

需求:使用StreamAPI獲取test這個topic當(dāng)中的數(shù)據(jù),然后將數(shù)據(jù)全部轉(zhuǎn)為大寫,寫入到test2這個topic當(dāng)中去。

第一步:創(chuàng)建一個topic

node01服務(wù)器使用以下命令來常見一個 topic 名稱為test2:

bin/kafka-topics.sh --create  --partitions 3 --replication-factor 2 --topic test2 --zookeeper node01:2181,node02:2181,node03:2181

第二步:開發(fā)StreamAPI

public class StreamAPI {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, 'wordcount-application');
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'node01:9092');
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
builder.stream('test').mapValues(line -> line.toString().toUpperCase()).to('test2');
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}

執(zhí)行上述代碼,監(jiān)聽獲取 test中的數(shù)據(jù),然后轉(zhuǎn)成大寫,將結(jié)果寫入 test2。

第三步:生產(chǎn)數(shù)據(jù)

node01執(zhí)行以下命令,向test這個topic當(dāng)中生產(chǎn)數(shù)據(jù):

bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test

第四步:消費(fèi)數(shù)據(jù)

node02執(zhí)行一下命令消費(fèi)test2這個topic當(dāng)中的數(shù)據(jù):

bin/kafka-console-consumer.sh --from-beginning  --topic test2 --zookeeper node01:2181,node02:2181,node03:2181

六、Kafka中的數(shù)據(jù)不丟失機(jī)制

1. 生產(chǎn)者生產(chǎn)數(shù)據(jù)不丟失

發(fā)送消息方式

生產(chǎn)者發(fā)送給kafka數(shù)據(jù),可以采用同步方式異步方式

同步方式

發(fā)送一批數(shù)據(jù)給kafka后,等待kafka返回結(jié)果:

  1. 生產(chǎn)者等待10s,如果broker沒有給出ack響應(yīng),就認(rèn)為失敗。
  2. 生產(chǎn)者重試3次,如果還沒有響應(yīng),就報(bào)錯.

異步方式

發(fā)送一批數(shù)據(jù)給kafka,只是提供一個回調(diào)函數(shù):

  1. 先將數(shù)據(jù)保存在生產(chǎn)者端的buffer中。buffer大小是2萬條 。
  2. 滿足數(shù)據(jù)閾值或者數(shù)量閾值其中的一個條件就可以發(fā)送數(shù)據(jù)。
  3. 發(fā)送一批數(shù)據(jù)的大小是500條。

注:如果broker遲遲不給ack,而buffer又滿了,開發(fā)者可以設(shè)置是否直接清空buffer中的數(shù)據(jù)。

ack機(jī)制(確認(rèn)機(jī)制)

生產(chǎn)者數(shù)據(jù)發(fā)送出去,需要服務(wù)端返回一個確認(rèn)碼,即ack響應(yīng)碼;ack的響應(yīng)有三個狀態(tài)值0,1,-1

0:生產(chǎn)者只負(fù)責(zé)發(fā)送數(shù)據(jù),不關(guān)心數(shù)據(jù)是否丟失,丟失的數(shù)據(jù),需要再次發(fā)送

1:partition的leader收到數(shù)據(jù),不管follow是否同步完數(shù)據(jù),響應(yīng)的狀態(tài)碼為1

-1:所有的從節(jié)點(diǎn)都收到數(shù)據(jù),響應(yīng)的狀態(tài)碼為-1

如果broker端一直不返回ack狀態(tài),producer永遠(yuǎn)不知道是否成功;producer可以設(shè)置一個超時時間10s,超過時間認(rèn)為失敗。

2. broker中數(shù)據(jù)不丟失

在broker中,保證數(shù)據(jù)不丟失主要是通過副本因子(冗余),防止數(shù)據(jù)丟失。

3. 消費(fèi)者消費(fèi)數(shù)據(jù)不丟失

在消費(fèi)者消費(fèi)數(shù)據(jù)的時候,只要每個消費(fèi)者記錄好offset值即可,就能保證數(shù)據(jù)不丟失。也就是需要我們自己維護(hù)偏移量(offset),可保存在 Redis 中。

七、Kafka配置文件說明

Server.properties配置文件說明

#broker的全局唯一編號,不能重復(fù)
broker.id=0

#用來監(jiān)聽鏈接的端口,producer或consumer將在此端口建立連接
port=9092

#處理網(wǎng)絡(luò)請求的線程數(shù)量
num.network.threads=3

#用來處理磁盤IO的線程數(shù)量
num.io.threads=8

#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400

#接受套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400

#請求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600

#kafka運(yùn)行日志存放的路徑
log.dirs=/export/data/kafka/

#topic在當(dāng)前broker上的分片個數(shù)
num.partitions=2

#用來恢復(fù)和清理data下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1

#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168

#滾動生成新的segment文件的最大時間
log.roll.hours=1

#日志文件中每個segment的大小,默認(rèn)為1G
log.segment.bytes=1073741824

#周期性檢查文件大小的時間
log.retention.check.interval.ms=300000

#日志清理是否打開
log.cleaner.enable=true

#broker需要使用zookeeper保存meta數(shù)據(jù)
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

#zookeeper鏈接超時時間
zookeeper.connection.timeout.ms=6000

#partion buffer中,消息的條數(shù)達(dá)到閾值,將觸發(fā)flush到磁盤
log.flush.interval.messages=10000

#消息buffer的時間,達(dá)到閾值,將觸發(fā)flush到磁盤
log.flush.interval.ms=3000

#刪除topic需要server.properties中設(shè)置delete.topic.enable=true否則只是標(biāo)記刪除
delete.topic.enable=true

#此處的host.name為本機(jī)IP(重要),如果不改,則客戶端會拋出:Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=kafka01

advertised.host.name=192.168.140.128

producer生產(chǎn)者配置文件說明
#指定kafka節(jié)點(diǎn)列表,用于獲取metadata,不必全部指定
metadata.broker.list=node01:9092,node02:9092,node03:9092
# 指定分區(qū)處理類。默認(rèn)kafka.producer.DefaultPartitioner,表通過key哈希到對應(yīng)分區(qū)
#partitioner.class=kafka.producer.DefaultPartitioner
# 是否壓縮,默認(rèn)0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮后消息中會有頭來指明消息壓縮類型,故在消費(fèi)者端消息解壓是透明的無需指定。
compression.codec=none
# 指定序列化處理類
serializer.class=kafka.serializer.DefaultEncoder
# 如果要壓縮消息,這里指定哪些topic要壓縮消息,默認(rèn)empty,表示不壓縮。
#compressed.topics=

# 設(shè)置發(fā)送數(shù)據(jù)是否需要服務(wù)端的反饋,有三個值0,1,-1
# 0: producer不會等待broker發(fā)送ack 
# 1: 當(dāng)leader接收到消息之后發(fā)送ack 
# -1: 當(dāng)所有的follower都同步消息成功后發(fā)送ack. 
request.required.acks=0 

# 在向producer發(fā)送ack之前,broker允許等待的最大時間 ,如果超時,broker將會向producer發(fā)送一個error ACK.意味著上一次消息因?yàn)槟撤N原因未能成功(比如follower未能同步成功) 
request.timeout.ms=10000

# 同步還是異步發(fā)送消息,默認(rèn)“sync”表同步,'async'表異步。異步可以提高發(fā)送吞吐量,
也意味著消息將會在本地buffer中,并適時批量發(fā)送,但是也可能導(dǎo)致丟失未發(fā)送過去的消息
producer.type=sync

# 在async模式下,當(dāng)message被緩存的時間超過此值后,將會批量發(fā)送給broker,默認(rèn)為5000ms
# 此值和batch.num.messages協(xié)同工作.
queue.buffering.max.ms = 5000

# 在async模式下,producer端允許buffer的最大消息量
# 無論如何,producer都無法盡快的將消息發(fā)送給broker,從而導(dǎo)致消息在producer端大量沉積
# 此時,如果消息的條數(shù)達(dá)到閥值,將會導(dǎo)致producer端阻塞或者消息被拋棄,默認(rèn)為10000
queue.buffering.max.messages=20000

# 如果是異步,指定每次批量發(fā)送數(shù)據(jù)量,默認(rèn)為200
batch.num.messages=500

# 當(dāng)消息在producer端沉積的條數(shù)達(dá)到'queue.buffering.max.meesages'后 
# 阻塞一定時間后,隊(duì)列仍然沒有enqueue(producer仍然沒有發(fā)送出任何消息) 
# 此時producer可以繼續(xù)阻塞或者將消息拋棄,此timeout值用于控制'阻塞'的時間 
# -1: 無阻塞超時限制,消息不會被拋棄 
# 0:立即清空隊(duì)列,消息被拋棄 
queue.enqueue.timeout.ms=-1


# 當(dāng)producer接收到error ACK,或者沒有接收到ACK時,允許消息重發(fā)的次數(shù) 
# 因?yàn)閎roker并沒有完整的機(jī)制來避免消息重復(fù),所以當(dāng)網(wǎng)絡(luò)異常時(比如ACK丟失) 
# 有可能導(dǎo)致broker接收到重復(fù)的消息,默認(rèn)值為3.
message.send.max.retries=3

# producer刷新topic metada的時間間隔,producer需要知道partition leader的位置,以及當(dāng)前topic的情況 
# 因此producer需要一個機(jī)制來獲取最新的metadata,當(dāng)producer遇到特定錯誤時,將會立即刷新 
# (比如topic失效,partition丟失,leader失效等),此外也可以通過此參數(shù)來配置額外的刷新機(jī)制,默認(rèn)值600000 
topic.metadata.refresh.interval.ms=60000

consumer消費(fèi)者配置詳細(xì)說明:

# zookeeper連接服務(wù)器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
# zookeeper的session過期時間,默認(rèn)5000ms,用于檢測消費(fèi)者是否掛掉
zookeeper.session.timeout.ms=5000
#當(dāng)消費(fèi)者掛掉,其他消費(fèi)者要等該指定時間才能檢查到并且觸發(fā)重新負(fù)載均衡
zookeeper.connection.timeout.ms=10000
# 指定多久消費(fèi)者更新offset到zookeeper中。注意offset更新時基于time而不是每次獲得的消息。一旦在更新zookeeper發(fā)生異常并重啟,將可能拿到已拿到過的消息
zookeeper.sync.time.ms=2000
#指定消費(fèi) 
group.id=itcast
# 當(dāng)consumer消費(fèi)一定量的消息之后,將會自動向zookeeper提交offset信息 
# 注意offset信息并不是每消費(fèi)一次消息就向zk提交一次,而是現(xiàn)在本地保存(內(nèi)存),并定期提交,默認(rèn)為true
auto.commit.enable=true
# 自動更新時間。默認(rèn)60 * 1000
auto.commit.interval.ms=1000
# 當(dāng)前consumer的標(biāo)識,可以設(shè)定,也可以有系統(tǒng)生成,主要用來跟蹤消息消費(fèi)情況,便于觀察
conusmer.id=xxx 
# 消費(fèi)者客戶端編號,用于區(qū)分不同客戶端,默認(rèn)客戶端程序自動產(chǎn)生
client.id=xxxx
# 最大取多少塊緩存到消費(fèi)者(默認(rèn)10)
queued.max.message.chunks=50
# 當(dāng)有新的consumer加入到group時,將會reblance,此后將會有partitions的消費(fèi)端遷移到新  的consumer上,如果一個consumer獲得了某個partition的消費(fèi)權(quán)限,那么它將會向zk注冊 'Partition Owner registry'節(jié)點(diǎn)信息,但是有可能此時舊的consumer尚沒有釋放此節(jié)點(diǎn), 此值用于控制,注冊節(jié)點(diǎn)的重試次數(shù). 
rebalance.max.retries=5

# 獲取消息的最大尺寸,broker不會像consumer輸出大于此值的消息chunk 每次feth將得到多條消息,此值為總大小,提升此值,將會消耗更多的consumer端內(nèi)存
fetch.min.bytes=6553600

# 當(dāng)消息的尺寸不足時,server阻塞的時間,如果超時,消息將立即發(fā)送給consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
# 如果zookeeper沒有offset值或offset值超出范圍。那么就給個初始的offset。有smallest、largest、anything可選,分別表示給當(dāng)前最小的offset、當(dāng)前最大的offset、拋異常。默認(rèn)largest
auto.offset.reset=smallest
# 指定序列化處理類
derializer.class=kafka.serializer.DefaultDecoder

八、CAP理論

1. 分布式系統(tǒng)當(dāng)中的CAP理論

分布式系統(tǒng)(distributed system)正變得越來越重要,大型網(wǎng)站幾乎都是分布式的。

分布式系統(tǒng)的最大難點(diǎn),就是各個節(jié)點(diǎn)的狀態(tài)如何同步。

為了解決各個節(jié)點(diǎn)之間的狀態(tài)同步問題,在1998年,由加州大學(xué)的計(jì)算機(jī)科學(xué)家 Eric Brewer 提出分布式系統(tǒng)的三個指標(biāo),分別是:

  • Consistency:一致性

  • Availability:可用性

  • Partition tolerance:分區(qū)容錯性

Eric Brewer 說,這三個指標(biāo)不可能同時做到。最多只能同時滿足其中兩個條件,這個結(jié)論就叫做 CAP 定理。

CAP理論是指:分布式系統(tǒng)中,一致性、可用性和分區(qū)容忍性最多只能同時滿足兩個。

一致性:Consistency

  • 通過某個節(jié)點(diǎn)的寫操作結(jié)果對后面通過其它節(jié)點(diǎn)的讀操作可見
  • 如果更新數(shù)據(jù)后,并發(fā)訪問情況下后續(xù)讀操作可立即感知該更新,稱為強(qiáng)一致性
  • 如果允許之后部分或者全部感知不到該更新,稱為弱一致性
  • 若在之后的一段時間(通常該時間不固定)后,一定可以感知到該更新,稱為最終一致性

可用性:Availability

  • 任何一個沒有發(fā)生故障的節(jié)點(diǎn)必須在有限的時間內(nèi)返回合理的結(jié)果

分區(qū)容錯性:Partition tolerance

  • 部分節(jié)點(diǎn)宕機(jī)或者無法與其它節(jié)點(diǎn)通信時,各分區(qū)間還可保持分布式系統(tǒng)的功能

一般而言,都要求保證分區(qū)容忍性。所以在CAP理論下,更多的是需要在可用性和一致性之間做權(quán)衡。

2. Partition tolerance

先看 Partition tolerance,中文叫做'分區(qū)容錯'。

大多數(shù)分布式系統(tǒng)都分布在多個子網(wǎng)絡(luò)。每個子網(wǎng)絡(luò)就叫做一個區(qū)(partition)。分區(qū)容錯的意思是,區(qū)間通信可能失敗。比如,一臺服務(wù)器放在中國,另一臺服務(wù)器放在美國,這就是兩個區(qū),它們之間可能無法通信。

上圖中,G1 和 G2 是兩臺跨區(qū)的服務(wù)器。G1 向 G2 發(fā)送一條消息,G2 可能無法收到。系統(tǒng)設(shè)計(jì)的時候,必須考慮到這種情況。

一般來說,分區(qū)容錯無法避免,因此可以認(rèn)為 CAP 的 P 總是存在的。即永遠(yuǎn)可能存在分區(qū)容錯這個問題

3. Consistency

Consistency 中文叫做'一致性'。意思是,寫操作之后的讀操作,必須返回該值。舉例來說,某條記錄是 v0,用戶向 G1 發(fā)起一個寫操作,將其改為 v1。

接下來,用戶的讀操作就會得到 v1。這就叫一致性。
問題是,用戶有可能向 G2 發(fā)起讀操作,由于 G2 的值沒有發(fā)生變化,因此返回的是 v0。G1 和 G2 讀操作的結(jié)果不一致,這就不滿足一致性了。

為了讓 G2 也能變?yōu)?v1,就要在 G1 寫操作的時候,讓 G1 向 G2 發(fā)送一條消息,要求 G2 也改成 v1。

這樣的話,用戶向 G2 發(fā)起讀操作,也能得到 v1。

4. Availability

Availability 中文叫做'可用性',意思是只要收到用戶的請求,服務(wù)器就必須給出回應(yīng)。用戶可以選擇向 G1 或 G2 發(fā)起讀操作。不管是哪臺服務(wù)器,只要收到請求,就必須告訴用戶,到底是 v0 還是 v1,否則就不滿足可用性。

九、Kafka中的CAP機(jī)制

kafka是一個分布式的消息隊(duì)列系統(tǒng),既然是一個分布式的系統(tǒng),那么就一定滿足CAP定律,那么在kafka當(dāng)中是如何遵循CAP定律的呢?kafka滿足CAP定律當(dāng)中的哪兩個呢?

kafka滿足的是CAP定律當(dāng)中的CA,其中Partition  tolerance通過的是一定的機(jī)制盡量的保證分區(qū)容錯性

其中C表示的是數(shù)據(jù)一致性。A表示數(shù)據(jù)可用性。

kafka首先將數(shù)據(jù)寫入到不同的分區(qū)里面去,每個分區(qū)又可能有好多個副本,數(shù)據(jù)首先寫入到leader分區(qū)里面去,讀寫的操作都是與leader分區(qū)進(jìn)行通信,保證了數(shù)據(jù)的一致性原則,也就是滿足了Consistency原則。

然后kafka通過分區(qū)副本機(jī)制,來保證了kafka當(dāng)中數(shù)據(jù)的可用性。但是也存在另外一個問題,就是副本分區(qū)當(dāng)中的數(shù)據(jù)與leader當(dāng)中的數(shù)據(jù)存在差別的問題如何解決,這個就是Partition tolerance的問題。

kafka為了解決Partition tolerance的問題,使用了ISR的同步策略,來盡最大可能減少Partition tolerance的問題

每個leader會維護(hù)一個ISR(a set of in-sync replicas,基本同步)列表。

ISR列表主要的作用就是決定哪些副本分區(qū)是可用的,也就是說可以將leader分區(qū)里面的數(shù)據(jù)同步到副本分區(qū)里面去,決定一個副本分區(qū)是否可用的條件有兩個:

  • replica.lag.time.max.ms=10000     副本分區(qū)與主分區(qū)心跳時間延遲

  • replica.lag.max.messages=4000    副本分區(qū)與主分區(qū)消息同步最大差

produce 請求被認(rèn)為完成時的確認(rèn)值:request.required.acks=0。

  • ack=0:producer不等待broker同步完成的確認(rèn),繼續(xù)發(fā)送下一條(批)信息。
  • ack=1(默認(rèn)):producer要等待leader成功收到數(shù)據(jù)并得到確認(rèn),才發(fā)送下一條message。
  • ack=-1:producer得到follwer確認(rèn),才發(fā)送下一條數(shù)據(jù)。

十、Kafka監(jiān)控及運(yùn)維

在開發(fā)工作中,消費(fèi)在Kafka集群中消息,數(shù)據(jù)變化是我們關(guān)注的問題,當(dāng)業(yè)務(wù)前提不復(fù)雜時,我們可以使用Kafka 命令提供帶有Zookeeper客戶端工具的工具,可以輕松完成我們的工作。隨著業(yè)務(wù)的復(fù)雜性,增加Group和 Topic,那么我們使用Kafka提供命令工具,已經(jīng)感到無能為力,那么Kafka監(jiān)控系統(tǒng)目前尤為重要,我們需要觀察 消費(fèi)者應(yīng)用的細(xì)節(jié)。

1. kafka-eagle概述

為了簡化開發(fā)者和服務(wù)工程師維護(hù)Kafka集群的工作有一個監(jiān)控管理工具,叫做 Kafka-eagle。這個管理工具可以很容易地發(fā)現(xiàn)分布在集群中的哪些topic分布不均勻,或者是分區(qū)在整個集群分布不均勻的的情況。它支持管理多個集群、選擇副本、副本重新分配以及創(chuàng)建Topic。同時,這個管理工具也是一個非常好的可以快速瀏覽這個集群的工具,

2. 環(huán)境和安裝

1. 環(huán)境要求

需要安裝jdk,啟動zk以及kafka的服務(wù)

2. 安裝步驟

  1. 下載源碼包

kafka-eagle官網(wǎng):http://download.kafka-eagle.org/

我們可以從官網(wǎng)上面直接下載最細(xì)的安裝包即可kafka-eagle-bin-1.3.2.tar.gz這個版本即可

代碼托管地址:

https://github.com/smartloli/kafka-eagle/releases

  1. 解壓

這里我們選擇將kafak-eagle安裝在第三臺。

直接將kafka-eagle安裝包上傳到node03服務(wù)器的/export/softwares路徑下,然后進(jìn)行解壓node03服務(wù)器執(zhí)行一下命令進(jìn)行解壓。

  1. 準(zhǔn)備數(shù)據(jù)庫

kafka-eagle需要使用一個數(shù)據(jù)庫來保存一些元數(shù)據(jù)信息,我們這里直接使用msyql數(shù)據(jù)庫來保存即可,在node03服務(wù)器執(zhí)行以下命令創(chuàng)建一個mysql數(shù)據(jù)庫即可。

進(jìn)入mysql客戶端:

create database eagle;
  1. 修改kafak-eagle配置文件

執(zhí)行以下命令修改kafak-eagle配置文件:

vim system-config.properties

修改為如下:

kafka.eagle.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=node01:2181,node02:2181,node03:2181
cluster2.zk.list=node01:2181,node02:2181,node03:2181

kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node03:3306/eagle
kafka.eagle.username=root
kafka.eagle.password=123456
  1. 配置環(huán)境變量

kafka-eagle必須配置環(huán)境變量,node03服務(wù)器執(zhí)行以下命令來進(jìn)行配置環(huán)境變量: vim /etc/profile

export KE_HOME=/opt//kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2
export PATH=:$KE_HOME/bin:$PATH

修改立即生效,執(zhí)行: source /etc/profile

  1. 啟動kafka-eagle

執(zhí)行以下界面啟動kafka-eagle:

cd kafka-eagle-web-1.3.2/bin
chmod u+x ke.sh
./ke.sh start
  1. 主界面

訪問kafka-eagle

http://node03:8048/ke/account/signin?/ke/

用戶名:admin

密碼:123456

十一、Kafka大廠面試題

1. 為什么要使用 kafka?

  1. 緩沖和削峰:上游數(shù)據(jù)時有突發(fā)流量,下游可能扛不住,或者下游沒有足夠多的機(jī)器來保證冗余,kafka在中間可以起到一個緩沖的作用,把消息暫存在kafka中,下游服務(wù)就可以按照自己的節(jié)奏進(jìn)行慢慢處理。

  2. 解耦和擴(kuò)展性:項(xiàng)目開始的時候,并不能確定具體需求。消息隊(duì)列可以作為一個接口層,解耦重要的業(yè)務(wù)流程。只需要遵守約定,針對數(shù)據(jù)編程即可獲取擴(kuò)展能力。

  3. 冗余:可以采用一對多的方式,一個生產(chǎn)者發(fā)布消息,可以被多個訂閱topic的服務(wù)消費(fèi)到,供多個毫無關(guān)聯(lián)的業(yè)務(wù)使用。

  4. 健壯性:消息隊(duì)列可以堆積請求,所以消費(fèi)端業(yè)務(wù)即使短時間死掉,也不會影響主要業(yè)務(wù)的正常進(jìn)行。

  5. 異步通信:很多時候,用戶不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許用戶把一個消息放入隊(duì)列,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少,然后在需要的時候再去處理它們。

2. Kafka消費(fèi)過的消息如何再消費(fèi)?

kafka消費(fèi)消息的offset是定義在zookeeper中的,如果想重復(fù)消費(fèi)kafka的消息,可以在redis中自己記錄offset的checkpoint點(diǎn)(n個),當(dāng)想重復(fù)消費(fèi)消息時,通過讀取redis中的checkpoint點(diǎn)進(jìn)行zookeeper的offset重設(shè),這樣就可以達(dá)到重復(fù)消費(fèi)消息的目的了

3. kafka的數(shù)據(jù)是放在磁盤上還是內(nèi)存上,為什么速度會快?

kafka使用的是磁盤存儲。

速度快是因?yàn)椋?/p>

  1. 順序?qū)懭耄阂驗(yàn)橛脖P是機(jī)械結(jié)構(gòu),每次讀寫都會尋址->寫入,其中尋址是一個“機(jī)械動作”,它是耗時的。所以硬盤 “討厭”隨機(jī)I/O, 喜歡順序I/O。為了提高讀寫硬盤的速度,Kafka就是使用順序I/O。
  2. Memory Mapped Files(內(nèi)存映射文件):64位操作系統(tǒng)中一般可以表示20G的數(shù)據(jù)文件,它的工作原理是直接利用操作系統(tǒng)的Page來實(shí)現(xiàn)文件到物理內(nèi)存的直接映射。完成映射之后你對物理內(nèi)存的操作會被同步到硬盤上。
  3. Kafka高效文件存儲設(shè)計(jì):Kafka把topic中一個parition大文件分成多個小文件段,通過多個小文件段,就容易定期清除或刪除已經(jīng)消費(fèi)完文件,減少磁盤占用。通過索引信息可以快速定位message和確定response的大小。通過index元數(shù)據(jù)全部映射到memory(內(nèi)存映射文件),可以避免segment file的IO磁盤操作。通過索引文件稀疏存儲,可以大幅降低index文件元數(shù)據(jù)占用空間大小。

  1. Kafka解決查詢效率的手段之一是將數(shù)據(jù)文件分段,比如有100條Message,它們的offset是從0到99。假設(shè)將數(shù)據(jù)文件分成5段,第一段為0-19,第二段為20-39,以此類推,每段放在一個單獨(dú)的數(shù)據(jù)文件里面,數(shù)據(jù)文件以該段中 小的offset命名。這樣在查找指定offset的Message的時候,用二分查找就可以定位到該Message在哪個段中。
  2. 為數(shù)據(jù)文件建 索引數(shù)據(jù)文件分段 使得可以在一個較小的數(shù)據(jù)文件中查找對應(yīng)offset的Message 了,但是這依然需要順序掃描才能找到對應(yīng)offset的Message。為了進(jìn)一步提高查找的效率,Kafka為每個分段后的數(shù)據(jù)文件建立了索引文件,文件名與數(shù)據(jù)文件的名字是一樣的,只是文件擴(kuò)展名為.index。

4. Kafka數(shù)據(jù)怎么保障不丟失?

分三個點(diǎn)說,一個是生產(chǎn)者端,一個消費(fèi)者端,一個broker端。

  1. 生產(chǎn)者數(shù)據(jù)的不丟失

kafka的ack機(jī)制:在kafka發(fā)送數(shù)據(jù)的時候,每次發(fā)送消息都會有一個確認(rèn)反饋機(jī)制,確保消息正常的能夠被收到,其中狀態(tài)有0,1,-1。

如果是同步模式:
ack設(shè)置為0,風(fēng)險(xiǎn)很大,一般不建議設(shè)置為0。即使設(shè)置為1,也會隨著leader宕機(jī)丟失數(shù)據(jù)。所以如果要嚴(yán)格保證生產(chǎn)端數(shù)據(jù)不丟失,可設(shè)置為-1。

如果是異步模式:
也會考慮ack的狀態(tài),除此之外,異步模式下的有個buffer,通過buffer來進(jìn)行控制數(shù)據(jù)的發(fā)送,有兩個值來進(jìn)行控制,時間閾值與消息的數(shù)量閾值,如果buffer滿了數(shù)據(jù)還沒有發(fā)送出去,有個選項(xiàng)是配置是否立即清空buffer??梢栽O(shè)置為-1,永久阻塞,也就數(shù)據(jù)不再生產(chǎn)。異步模式下,即使設(shè)置為-1。也可能因?yàn)槌绦騿T的不科學(xué)操作,操作數(shù)據(jù)丟失,比如kill -9,但這是特別的例外情況。

注:
ack=0:producer不等待broker同步完成的確認(rèn),繼續(xù)發(fā)送下一條(批)信息。
ack=1(默認(rèn)):producer要等待leader成功收到數(shù)據(jù)并得到確認(rèn),才發(fā)送下一條message。
ack=-1:producer得到follwer確認(rèn),才發(fā)送下一條數(shù)據(jù)。

  1. 消費(fèi)者數(shù)據(jù)的不丟失

通過offset commit 來保證數(shù)據(jù)的不丟失,kafka自己記錄了每次消費(fèi)的offset數(shù)值,下次繼續(xù)消費(fèi)的時候,會接著上次的offset進(jìn)行消費(fèi)。

而offset的信息在kafka0.8版本之前保存在zookeeper中,在0.8版本之后保存到topic中,即使消費(fèi)者在運(yùn)行過程中掛掉了,再次啟動的時候會找到offset的值,找到之前消費(fèi)消息的位置,接著消費(fèi),由于 offset 的信息寫入的時候并不是每條消息消費(fèi)完成后都寫入的,所以這種情況有可能會造成重復(fù)消費(fèi),但是不會丟失消息。

唯一例外的情況是,我們在程序中給原本做不同功能的兩個consumer組設(shè)置KafkaSpoutConfig.bulider.setGroupid的時候設(shè)置成了一樣的groupid,這種情況會導(dǎo)致這兩個組共享同一份數(shù)據(jù),就會產(chǎn)生組A消費(fèi)partition1,partition2中的消息,組B消費(fèi)partition3的消息,這樣每個組消費(fèi)的消息都會丟失,都是不完整的。為了保證每個組都獨(dú)享一份消息數(shù)據(jù),groupid一定不要重復(fù)才行。

  1. kafka集群中的broker的數(shù)據(jù)不丟失

每個broker中的partition我們一般都會設(shè)置有replication(副本)的個數(shù),生產(chǎn)者寫入的時候首先根據(jù)分發(fā)策略(有partition按partition,有key按key,都沒有輪詢)寫入到leader中,follower(副本)再跟leader同步數(shù)據(jù),這樣有了備份,也可以保證消息數(shù)據(jù)的不丟失。

5. 采集數(shù)據(jù)為什么選擇kafka?

采集層 主要可以使用Flume, Kafka等技術(shù)。

Flume:Flume 是管道流方式,提供了很多的默認(rèn)實(shí)現(xiàn),讓用戶通過參數(shù)部署,及擴(kuò)展API.

Kafka:Kafka是一個可持久化的分布式的消息隊(duì)列。Kafka 是一個非常通用的系統(tǒng)。你可以有許多生產(chǎn)者和很多的消費(fèi)者共享多個主題Topics。

相比之下,Flume是一個專用工具被設(shè)計(jì)為旨在往HDFS,HBase發(fā)送數(shù)據(jù)。它對HDFS有特殊的優(yōu)化,并且集成了Hadoop的安全特性。

所以,Cloudera 建議如果數(shù)據(jù)被多個系統(tǒng)消費(fèi)的話,使用kafka;如果數(shù)據(jù)被設(shè)計(jì)給Hadoop使用,使用Flume。

6. kafka 重啟是否會導(dǎo)致數(shù)據(jù)丟失?

  1. kafka是將數(shù)據(jù)寫到磁盤的,一般數(shù)據(jù)不會丟失。
  2. 但是在重啟kafka過程中,如果有消費(fèi)者消費(fèi)消息,那么kafka如果來不及提交offset,可能會造成數(shù)據(jù)的不準(zhǔn)確(丟失或者重復(fù)消費(fèi))。

7. kafka 宕機(jī)了如何解決?

  1. 先考慮業(yè)務(wù)是否受到影響

kafka 宕機(jī)了,首先我們考慮的問題應(yīng)該是所提供的服務(wù)是否因?yàn)殄礄C(jī)的機(jī)器而受到影響,如果服務(wù)提供沒問題,如果實(shí)現(xiàn)做好了集群的容災(zāi)機(jī)制,那么這塊就不用擔(dān)心了。

  1. 節(jié)點(diǎn)排錯與恢復(fù)

想要恢復(fù)集群的節(jié)點(diǎn),主要的步驟就是通過日志分析來查看節(jié)點(diǎn)宕機(jī)的原因,從而解決,重新恢復(fù)節(jié)點(diǎn)。

8. 為什么Kafka不支持讀寫分離?

在 Kafka 中,生產(chǎn)者寫入消息、消費(fèi)者讀取消息的操作都是與 leader 副本進(jìn)行交互的,從 而實(shí)現(xiàn)的是一種主寫主讀的生產(chǎn)消費(fèi)模型。Kafka 并不支持主寫從讀,因?yàn)橹鲗憦淖x有 2 個很明顯的缺點(diǎn):

  1. 數(shù)據(jù)一致性問題:數(shù)據(jù)從主節(jié)點(diǎn)轉(zhuǎn)到從節(jié)點(diǎn)必然會有一個延時的時間窗口,這個時間 窗口會導(dǎo)致主從節(jié)點(diǎn)之間的數(shù)據(jù)不一致。某一時刻,在主節(jié)點(diǎn)和從節(jié)點(diǎn)中 A 數(shù)據(jù)的值都為 X, 之后將主節(jié)點(diǎn)中 A 的值修改為 Y,那么在這個變更通知到從節(jié)點(diǎn)之前,應(yīng)用讀取從節(jié)點(diǎn)中的 A 數(shù)據(jù)的值并不為最新的 Y,由此便產(chǎn)生了數(shù)據(jù)不一致的問題。

  2. 延時問題:類似 Redis 這種組件,數(shù)據(jù)從寫入主節(jié)點(diǎn)到同步至從節(jié)點(diǎn)中的過程需要經(jīng)歷 網(wǎng)絡(luò)→主節(jié)點(diǎn)內(nèi)存→網(wǎng)絡(luò)→從節(jié)點(diǎn)內(nèi)存 這幾個階段,整個過程會耗費(fèi)一定的時間。而在 Kafka 中,主從同步會比 Redis 更加耗時,它需要經(jīng)歷 網(wǎng)絡(luò)→主節(jié)點(diǎn)內(nèi)存→主節(jié)點(diǎn)磁盤→網(wǎng)絡(luò)→從節(jié) 點(diǎn)內(nèi)存→從節(jié)點(diǎn)磁盤 這幾個階段。對延時敏感的應(yīng)用而言,主寫從讀的功能并不太適用。

而kafka的主寫主讀的優(yōu)點(diǎn)就很多了:

  1. 可以簡化代碼的實(shí)現(xiàn)邏輯,減少出錯的可能;
  2. 將負(fù)載粒度細(xì)化均攤,與主寫從讀相比,不僅負(fù)載效能更好,而且對用戶可控;
  3. 沒有延時的影響;
  4. 在副本穩(wěn)定的情況下,不會出現(xiàn)數(shù)據(jù)不一致的情況。

9. kafka數(shù)據(jù)分區(qū)和消費(fèi)者的關(guān)系?

每個分區(qū)只能由同一個消費(fèi)組內(nèi)的一個消費(fèi)者(consumer)來消費(fèi),可以由不同的消費(fèi)組的消費(fèi)者來消費(fèi),同組的消費(fèi)者則起到并發(fā)的效果。

10. kafka的數(shù)據(jù)offset讀取流程

  1. 連接ZK集群,從ZK中拿到對應(yīng)topic的partition信息和partition的Leader的相關(guān)信息

  2. 連接到對應(yīng)Leader對應(yīng)的broker

  3. consumer將?自?己保存的offset發(fā)送給Leader

  4. Leader根據(jù)offset等信息定位到segment(索引?文件和?日志?文件)

  5. 根據(jù)索引?文件中的內(nèi)容,定位到?日志?文件中該偏移量量對應(yīng)的開始位置讀取相應(yīng)?長度的數(shù)據(jù)并返回給consumer

11. kafka內(nèi)部如何保證順序,結(jié)合外部組件如何保證消費(fèi)者的順序?

kafka只能保證partition內(nèi)是有序的,但是partition間的有序是沒辦法的。愛奇藝的搜索架構(gòu),是從業(yè)務(wù)上把需要有序的打到同?個partition。

12. Kafka消息數(shù)據(jù)積壓,Kafka消費(fèi)能力不足怎么處理?

  1. 如果是Kafka消費(fèi)能力不足,則可以考慮增加Topic的分區(qū)數(shù),并且同時提升消費(fèi)組的消費(fèi)者數(shù)量,消費(fèi)者數(shù)=分區(qū)數(shù)。(兩者缺一不可)

  2. 如果是下游的數(shù)據(jù)處理不及時:提高每批次拉取的數(shù)量。批次拉取數(shù)據(jù)過少(拉取數(shù)據(jù)/處理時間<生產(chǎn)速度),使處理的數(shù)據(jù)小于生產(chǎn)的數(shù)據(jù),也會造成數(shù)據(jù)積壓。

13.  Kafka單條日志傳輸大小

kafka對于消息體的大小默認(rèn)為單條最大值是1M但是在我們應(yīng)用場景中, 常常會出現(xiàn)一條消息大于1M,如果不對kafka進(jìn)行配置。則會出現(xiàn)生產(chǎn)者無法將消息推送到kafka或消費(fèi)者無法去消費(fèi)kafka里面的數(shù)據(jù), 這時我們就要對kafka進(jìn)行以下配置:server.properties

replica.fetch.max.bytes: 1048576  broker可復(fù)制的消息的最大字節(jié)數(shù), 默認(rèn)為1M
message.max.bytes: 1000012   kafka 會接收單個消息size的最大限制, 默認(rèn)為1M左右

注意:message.max.bytes必須小于等于replica.fetch.max.bytes,否則就會導(dǎo)致replica之間數(shù)據(jù)同步失敗。

- EOF -
本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊舉報(bào)
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服