Kafka 涉及的知識點(diǎn)如下圖所示,本文將逐一講解:
消息(Message)是指在應(yīng)用之間傳送的數(shù)據(jù),消息可以非常簡單,比如只包含文本字符串,也可以更復(fù)雜,可能包含嵌入對象。消息隊(duì)列(Message Queue)是一種應(yīng)用間的通信方式,消息發(fā)送后可以立即返回,有消息系統(tǒng)來確保信息的可靠專遞,消息發(fā)布者只管把消息發(fā)布到MQ中而不管誰來取,消息使用者只管從MQ中取消息而不管誰發(fā)布的,這樣發(fā)布者和使用者都不用知道對方的存在。
消息隊(duì)列在實(shí)際應(yīng)用中包括如下四個場景:
應(yīng)用耦合:多應(yīng)用間通過消息隊(duì)列對同一消息進(jìn)行處理,避免調(diào)用接口失敗導(dǎo)致整個過程失?。?/p>
異步處理:多應(yīng)用對消息隊(duì)列中同一消息進(jìn)行處理,應(yīng)用間并發(fā)處理消息,相比串行處理,減少處理時間;
限流削峰:廣泛應(yīng)用于秒殺或搶購活動中,避免流量過大導(dǎo)致應(yīng)用系統(tǒng)掛掉的情況;
消息驅(qū)動的系統(tǒng):系統(tǒng)分為消息隊(duì)列、消息生產(chǎn)者、消息消費(fèi)者,生產(chǎn)者負(fù)責(zé)產(chǎn)生消息,消費(fèi)者(可能有多個)負(fù)責(zé)對消息進(jìn)行處理;
下面詳細(xì)介紹上述四個場景以及消息隊(duì)列如何在上述四個場景中使用:
具體場景:用戶為了使用某個應(yīng)用,進(jìn)行注冊,系統(tǒng)需要發(fā)送注冊郵件并驗(yàn)證短信。對這兩個操作的處理方式有兩種:串行及并行。
串行方式:新注冊信息生成后,先發(fā)送注冊郵件,再發(fā)送驗(yàn)證短信;
在這種方式下,需要最終發(fā)送驗(yàn)證短信后再返回給客戶端。
并行處理:新注冊信息寫入后,由發(fā)短信和發(fā)郵件并行處理;
在這種方式下,發(fā)短信和發(fā)郵件 需處理完成后再返回給客戶端。假設(shè)以上三個子系統(tǒng)處理的時間均為50ms,且不考慮網(wǎng)絡(luò)延遲,則總的處理時間:
串行:50+50+50=150ms
并行:50+50 = 100ms
若使用消息隊(duì)列:
在寫入消息隊(duì)列后立即返回成功給客戶端,則總的響應(yīng)時間依賴于寫入消息隊(duì)列的時間,而寫入消息隊(duì)列的時間本身是可以很快的,基本可以忽略不計(jì),因此總的處理時間相比串行提高了2倍,相比并行提高了一倍;
具體場景:用戶使用QQ相冊上傳一張圖片,人臉識別系統(tǒng)會對該圖片進(jìn)行人臉識別,一般的做法是,服務(wù)器接收到圖片后,圖片上傳系統(tǒng)立即調(diào)用人臉識別系統(tǒng),調(diào)用完成后再返回成功,如下圖所示:
該方法有如下缺點(diǎn):
人臉識別系統(tǒng)被調(diào)失敗,導(dǎo)致圖片上傳失敗;
延遲高,需要人臉識別系統(tǒng)處理完成后,再返回給客戶端,即使用戶并不需要立即知道結(jié)果;
圖片上傳系統(tǒng)與人臉識別系統(tǒng)之間互相調(diào)用,需要做耦合;
若使用消息隊(duì)列:
客戶端上傳圖片后,圖片上傳系統(tǒng)將圖片信息如uin、批次寫入消息隊(duì)列,直接返回成功;而人臉識別系統(tǒng)則定時從消息隊(duì)列中取數(shù)據(jù),完成對新增圖片的識別。
此時圖片上傳系統(tǒng)并不需要關(guān)心人臉識別系統(tǒng)是否對這些圖片信息的處理、以及何時對這些圖片信息進(jìn)行處理。事實(shí)上,由于用戶并不需要立即知道人臉識別結(jié)果,人臉識別系統(tǒng)可以選擇不同的調(diào)度策略,按照閑時、忙時、正常時間,對隊(duì)列中的圖片信息進(jìn)行處理。
具體場景:購物網(wǎng)站開展秒殺活動,一般由于瞬時訪問量過大,服務(wù)器接收過大,會導(dǎo)致流量暴增,相關(guān)系統(tǒng)無法處理請求甚至崩潰。而加入消息隊(duì)列后,系統(tǒng)可以從消息隊(duì)列中取數(shù)據(jù),相當(dāng)于消息隊(duì)列做了一次緩沖。
請求先入消息隊(duì)列,而不是由業(yè)務(wù)處理系統(tǒng)直接處理,做了一次緩沖,極大地減少了業(yè)務(wù)處理系統(tǒng)的壓力;
隊(duì)列長度可以做限制,事實(shí)上,秒殺時,后入隊(duì)列的用戶無法秒殺到商品,這些請求可以直接被拋棄,返回活動已結(jié)束或商品已售完信息;
4.消息驅(qū)動的系統(tǒng)
具體場景:用戶新上傳了一批照片,人臉識別系統(tǒng)需要對這個用戶的所有照片進(jìn)行聚類,聚類完成后由對賬系統(tǒng)重新生成用戶的人臉?biāo)饕?加快查詢)。這三個子系統(tǒng)間由消息隊(duì)列連接起來,前一個階段的處理結(jié)果放入隊(duì)列中,后一個階段從隊(duì)列中獲取消息繼續(xù)處理。
避免了直接調(diào)用下一個系統(tǒng)導(dǎo)致當(dāng)前系統(tǒng)失?。?/p>
每個子系統(tǒng)對于消息的處理方式可以更為靈活,可以選擇收到消息時就處理,可以選擇定時處理,也可以劃分時間段按不同處理速度處理;
消息隊(duì)列包括兩種模式,點(diǎn)對點(diǎn)模式(point to point, queue)和發(fā)布/訂閱模式(publish/subscribe,topic)
點(diǎn)對點(diǎn)模式下包括三個角色:
點(diǎn)對點(diǎn)模式特點(diǎn):
發(fā)布/訂閱模式下包括三個角色:
發(fā)布/訂閱模式特點(diǎn):
RabbitMQ 2007年發(fā)布,是一個在AMQP(高級消息隊(duì)列協(xié)議)基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng),是當(dāng)前最主流的消息中間件之一。
ActiveMQ是由Apache出品,ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。它非??焖伲С侄喾N語言的客戶端和協(xié)議,而且可以非常容易的嵌入到企業(yè)的應(yīng)用環(huán)境中,并有許多高級功能。
RocketMQ出自 阿里公司的開源產(chǎn)品,用 Java 語言實(shí)現(xiàn),在設(shè)計(jì)時參考了 Kafka,并做出了自己的一些改進(jìn),消息可靠性上比 Kafka 更好。RocketMQ在阿里集團(tuán)被廣泛應(yīng)用在訂單,交易,充值,流計(jì)算,消息推送,日志流式處理等。
Apache Kafka是一個分布式消息發(fā)布訂閱系統(tǒng)。它最初由LinkedIn公司基于獨(dú)特的設(shè)計(jì)實(shí)現(xiàn)為一個分布式的提交日志系統(tǒng)( a distributed commit log),之后成為Apache項(xiàng)目的一部分。Kafka系統(tǒng)快速、可擴(kuò)展并且可持久化。它的分區(qū)特性,可復(fù)制和可容錯都是其不錯的特性。
Apahce Pulasr是一個企業(yè)級的發(fā)布-訂閱消息系統(tǒng),最初是由雅虎開發(fā),是下一代云原生分布式消息流平臺,集消息、存儲、輕量化函數(shù)式計(jì)算為一體,采用計(jì)算與存儲分離架構(gòu)設(shè)計(jì),支持多租戶、持久化存儲、多機(jī)房跨區(qū)域數(shù)據(jù)復(fù)制,具有強(qiáng)一致性、高吞吐、低延時及高可擴(kuò)展性等流數(shù)據(jù)存儲特性。
Pulsar 非常靈活:它既可以應(yīng)用于像 Kafka 這樣的分布式日志應(yīng)用場景,也可以應(yīng)用于像 RabbitMQ 這樣的純消息傳遞系統(tǒng)場景。它支持多種類型的訂閱、多種交付保證、保留策略以及處理模式演變的方法,以及其他諸多特性。
內(nèi)置多租戶:不同的團(tuán)隊(duì)可以使用相同的集群并將其隔離,解決了許多管理難題。它支持隔離、身份驗(yàn)證、授權(quán)和配額;
多層體系結(jié)構(gòu):Pulsar 將所有 topic 數(shù)據(jù)存儲在由 Apache BookKeeper 支持的專業(yè)數(shù)據(jù)層中。存儲和消息傳遞的分離解決了擴(kuò)展、重新平衡和維護(hù)集群的許多問題。它還提高了可靠性,幾乎不可能丟失數(shù)據(jù)。另外,在讀取數(shù)據(jù)時可以直連 BookKeeper,且不影響實(shí)時攝取。例如,可以使用 Presto 對 topic 執(zhí)行 SQL 查詢,類似于 KSQL,但不會影響實(shí)時數(shù)據(jù)處理;
虛擬 topic:由于采用 n 層體系結(jié)構(gòu),因此對 topic 的數(shù)量沒有限制,topic 及其存儲是分離的。用戶還可以創(chuàng)建非持久性 topic;
N 層存儲:Kafka 的一個問題是,存儲費(fèi)用可能變高。因此,它很少用于存儲'冷'數(shù)據(jù),并且消息經(jīng)常被刪除,Apache Pulsar 可以借助分層存儲自動將舊數(shù)據(jù)卸載到 Amazon S3 或其他數(shù)據(jù)存儲系統(tǒng),并且仍然向客戶端展示透明視圖;Pulsar 客戶端可以從時間開始節(jié)點(diǎn)讀取,就像所有消息都存在于日志中一樣;
Pulsar 的多層架構(gòu)影響了存儲數(shù)據(jù)的方式。Pulsar 將 topic 分區(qū)劃分為分片(segment),然后將這些分片存儲在 Apache BookKeeper 的存儲節(jié)點(diǎn)上,以提高性能、可伸縮性和可用性。
Pulsar 的無限分布式日志以分片為中心,借助擴(kuò)展日志存儲(通過 Apache BookKeeper)實(shí)現(xiàn),內(nèi)置分層存儲支持,因此分片可以均勻地分布在存儲節(jié)點(diǎn)上。
由于與任一給定 topic 相關(guān)的數(shù)據(jù)都不會與特定存儲節(jié)點(diǎn)進(jìn)行捆綁,因此很容易替換存儲節(jié)點(diǎn)或縮擴(kuò)容。另外,集群中最小或最慢的節(jié)點(diǎn)也不會成為存儲或帶寬的短板。
Pulsar 架構(gòu)能實(shí)現(xiàn)分區(qū)管理,負(fù)載均衡,因此使用 Pulsar 能夠快速擴(kuò)展并達(dá)到高可用。這兩點(diǎn)至關(guān)重要,所以 Pulsar 非常適合用來構(gòu)建關(guān)鍵任務(wù)服務(wù),如金融應(yīng)用場景的計(jì)費(fèi)平臺,電子商務(wù)和零售商的交易處理系統(tǒng),金融機(jī)構(gòu)的實(shí)時風(fēng)險(xiǎn)控制系統(tǒng)等。
通過性能強(qiáng)大的 Netty 架構(gòu),數(shù)據(jù)從 producers 到 broker,再到 bookie 的轉(zhuǎn)移都是零拷貝,不會生成副本。這一特性對所有流應(yīng)用場景都非常友好,因?yàn)閿?shù)據(jù)直接通過網(wǎng)絡(luò)或磁盤進(jìn)行傳輸,沒有任何性能損失。
Pulsar 的消費(fèi)模型采用了流拉取的方式。流拉取是長輪詢的改進(jìn)版,不僅實(shí)現(xiàn)了單個調(diào)用和請求之間的零等待,還可以提供雙向消息流。通過流拉取模型,Pulsar 實(shí)現(xiàn)了端到端的低延遲,這種低延遲比所有現(xiàn)有的長輪詢消息系統(tǒng)(如 Kafka)都低。
更多功能:Pulsar Function、多租戶、Schema registry、n 層存儲、多種消費(fèi)模式和持久性模式等;
更大的靈活性:3 種訂閱類型(獨(dú)占,共享和故障轉(zhuǎn)移),用戶可以在一個訂閱上管理多個 topic;
易于操作運(yùn)維:架構(gòu)解耦和 n 層存儲;
與 Presto 的 SQL 集成,可直接查詢存儲而不會影響 broker;
借助 n 層自動存儲選項(xiàng),可以更低成本地存儲;
Pulsar 并不完美,Pulsar 也存在一些問題:
相對缺乏支持、文檔和案例;
n 層體系結(jié)構(gòu)導(dǎo)致需要更多組件:BookKeeper;
插件和客戶端相對 Kafka 較少;
云中的支持較少,Confluent 具有托管云產(chǎn)品。
同時需要像 RabbitMQ 這樣的隊(duì)列和 Kafka 這樣的流處理程序;
需要易用的地理復(fù)制;
實(shí)現(xiàn)多租戶,并確保每個團(tuán)隊(duì)的訪問權(quán)限;
需要長時間保留消息,并且不想將其卸載到另一個存儲中;
需要高性能,基準(zhǔn)測試表明 Pulsar 提供了更低的延遲和更高的吞吐量;
總之,Pulsar還比較新,社區(qū)不完善,用的企業(yè)比較少,網(wǎng)上有價值的討論和問題的解決比較少,遠(yuǎn)沒有Kafka生態(tài)系統(tǒng)龐大,且用戶量非常龐大,目前Kafka依舊是大數(shù)據(jù)領(lǐng)域消息隊(duì)列的王者!所以我們還是以Kafka為主!
官網(wǎng):http://kafka.apache.org/
kafka是最初由linkedin公司開發(fā)的,使用scala語言編寫,kafka是一個分布式,分區(qū)的,多副本的,多訂閱者的日志系統(tǒng)(分布式MQ系統(tǒng)),可以用于搜索日志,監(jiān)控日志,訪問日志等。
Kafka is a distributed, partitioned, replicated commit logservice。它提供了類似于JMS的特性,但是在設(shè)計(jì)實(shí)現(xiàn)上完全不同,此外它并不是JMS規(guī)范的實(shí)現(xiàn)。
kafka對消息保存時根據(jù)Topic進(jìn)行歸類,發(fā)送消息者成為Producer, 消息接受者成為Consumer, 此外kafka集群有多個kafka實(shí)例組成,每個實(shí)例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴于zookeeper來保證系統(tǒng)可用性集群保存一些meta信息。
apache kafka是一個分布式發(fā)布-訂閱消息系統(tǒng)和一個強(qiáng)大的隊(duì)列,可以處理大量的數(shù)據(jù),并使能夠?qū)⑾囊粋€端點(diǎn)傳遞到另一個端點(diǎn),kafka適合離線和在線消息消費(fèi)。
kafka消息保留在磁盤上,并在集群內(nèi)復(fù)制以防止數(shù)據(jù)丟失。kafka構(gòu)建在zookeeper同步服務(wù)之上。它與apache和spark非常好的集成,應(yīng)用于實(shí)時流式數(shù)據(jù)分析。
kafka 通常用于操作監(jiān)控?cái)?shù)據(jù)。這設(shè)計(jì)聚合來自分布式應(yīng)用程序的統(tǒng)計(jì)信息,以產(chǎn)生操作的數(shù)據(jù)集中反饋
kafka可用于跨組織從多個服務(wù)器收集日志,并使他們以標(biāo)準(zhǔn)的格式提供給多個服務(wù)器。
流式處理框架(spark,storm,?ink)重主題中讀取數(shù)據(jù),對齊進(jìn)行處理,并將處理后的數(shù)據(jù)寫入新的主題,供 用戶和應(yīng)用程序使用,kafka的強(qiáng)耐久性在流處理的上下文中也非常的有用。
允許應(yīng)用程序發(fā)布記錄流至一個或者多個kafka的主題(topics)。
允許應(yīng)用程序訂閱一個或者多個主題,并處理這些主題接收到的記錄流。
3. StreamsAPI
允許應(yīng)用程序充當(dāng)流處理器(stream processor),從一個或者多個主題獲取輸入流,并生產(chǎn)一個輸出流到一個或 者多個主題,能夠有效的變化輸入流為輸出流。
允許構(gòu)建和運(yùn)行可重用的生產(chǎn)者或者消費(fèi)者,能夠把kafka主題連接到現(xiàn)有的應(yīng)用程序或數(shù)據(jù)系統(tǒng)。例如:一個連接到關(guān)系數(shù)據(jù)庫的連接器可能會獲取每個表的變化。
注:在Kafka 2.8.0 版本,移除了對Zookeeper的依賴,通過KRaft進(jìn)行自己的集群管理,使用Kafka內(nèi)部的Quorum控制器來取代ZooKeeper,因此用戶第一次可在完全不需要ZooKeeper的情況下執(zhí)行Kafka,這不只節(jié)省運(yùn)算資源,并且也使得Kafka效能更好,還可支持規(guī)模更大的集群。
過去Apache ZooKeeper是Kafka這類分布式系統(tǒng)的關(guān)鍵,ZooKeeper扮演協(xié)調(diào)代理的角色,所有代理服務(wù)器啟動時,都會連接到Zookeeper進(jìn)行注冊,當(dāng)代理狀態(tài)發(fā)生變化時,Zookeeper也會儲存這些數(shù)據(jù),在過去,ZooKeeper是一個強(qiáng)大的工具,但是畢竟ZooKeeper是一個獨(dú)立的軟件,使得Kafka整個系統(tǒng)變得復(fù)雜,因此官方?jīng)Q定使用內(nèi)部Quorum控制器來取代ZooKeeper。
這項(xiàng)工作從去年4月開始,而現(xiàn)在這項(xiàng)工作取得部分成果,用戶將可以在2.8版本,在沒有ZooKeeper的情況下執(zhí)行Kafka,官方稱這項(xiàng)功能為Kafka Raft元數(shù)據(jù)模式(KRaft)。
在KRaft模式,過去由Kafka控制器和ZooKeeper所操作的元數(shù)據(jù),將合并到這個新的Quorum控制器,并且在Kafka集群內(nèi)部執(zhí)行,當(dāng)然,如果使用者有特殊使用情境,Quorum控制器也可以在專用的硬件上執(zhí)行。
說完在新版本中移除zookeeper這個事,接著聊kafka的其他功能:
kafka支持消息持久化,消費(fèi)端是主動拉取數(shù)據(jù),消費(fèi)狀態(tài)和訂閱關(guān)系由客戶端負(fù)責(zé)維護(hù),消息消費(fèi)完后,不會立即刪除,會保留歷史消息。因此支持多訂閱時,消息只會存儲一份就可以。
producer主要是用于生產(chǎn)消息,是kafka當(dāng)中的消息生產(chǎn)者,生產(chǎn)的消息通過topic進(jìn)行歸類,保存到kafka的broker里面去。
kafka當(dāng)中,topic是消息的歸類,一個topic可以有多個分區(qū)(partition),每個分區(qū)保存部分topic的數(shù)據(jù),所有的partition當(dāng)中的數(shù)據(jù)全部合并起來,就是一個topic當(dāng)中的所有的數(shù)據(jù)。
一個broker服務(wù)下,可以創(chuàng)建多個分區(qū),broker數(shù)與分區(qū)數(shù)沒有關(guān)系;
在kafka中,每一個分區(qū)會有一個編號:編號從0開始。
每一個分區(qū)內(nèi)的數(shù)據(jù)是有序的,但全局的數(shù)據(jù)不能保證是有序的。(有序是指生產(chǎn)什么樣順序,消費(fèi)時也是什么樣的順序)
consumer是kafka當(dāng)中的消費(fèi)者,主要用于消費(fèi)kafka當(dāng)中的數(shù)據(jù),消費(fèi)者一定是歸屬于某個消費(fèi)組中的。
消費(fèi)者組由一個或者多個消費(fèi)者組成,同一個組中的消費(fèi)者對于同一條消息只消費(fèi)一次。
每個消費(fèi)者都屬于某個消費(fèi)者組,如果不指定,那么所有的消費(fèi)者都屬于默認(rèn)的組。
每個消費(fèi)者組都有一個ID,即group ID。組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來消費(fèi)一個訂閱主題( topic)的所有分區(qū)(partition)。
當(dāng)然,每個分區(qū)只能由同一個消費(fèi)組內(nèi)的一個消費(fèi)者(consumer)來消費(fèi),可以由不同的消費(fèi)組來消費(fèi)。
partition數(shù)量決定了每個consumer group中并發(fā)消費(fèi)者的最大數(shù)量。如下圖:
如上面左圖所示,如果只有兩個分區(qū),即使一個組內(nèi)的消費(fèi)者有4個,也會有兩個空閑的。
如上面右圖所示,有4個分區(qū),每個消費(fèi)者消費(fèi)一個分區(qū),并發(fā)量達(dá)到最大4。
在來看如下一幅圖:
如上圖所示,不同的消費(fèi)者組消費(fèi)同一個topic,這個topic有4個分區(qū),分布在兩個節(jié)點(diǎn)上。左邊的消費(fèi)組1有兩個消費(fèi)者,每個消費(fèi)者就要消費(fèi)兩個分區(qū)才能把消息完整的消費(fèi)完,右邊的消費(fèi)組2有四個消費(fèi)者,每個消費(fèi)者消費(fèi)一個分區(qū)即可。
總結(jié)下kafka中分區(qū)與消費(fèi)組的關(guān)系:
消費(fèi)組:由一個或者多個消費(fèi)者組成,同一個組中的消費(fèi)者對于同一條消息只消費(fèi)一次。某一個主題下的分區(qū)數(shù),對于消費(fèi)該主題的同一個消費(fèi)組下的消費(fèi)者數(shù)量,應(yīng)該小于等于該主題下的分區(qū)數(shù)。
如:某一個主題有4個分區(qū),那么消費(fèi)組中的消費(fèi)者應(yīng)該小于等于4,而且最好與分區(qū)數(shù)成整數(shù)倍 124 這樣。同一個分區(qū)下的數(shù)據(jù),在同一時刻,不能同一個消費(fèi)組的不同消費(fèi)者消費(fèi)。
總結(jié):分區(qū)數(shù)越多,同一時間可以有越多的消費(fèi)者來進(jìn)行消費(fèi),消費(fèi)數(shù)據(jù)的速度就會越快,提高消費(fèi)的性能。
kafka 中的分區(qū)副本如下圖所示:
副本數(shù)(replication-factor):控制消息保存在幾個broker(服務(wù)器)上,一般情況下副本數(shù)等于broker的個數(shù)。
一個broker服務(wù)下,不可以創(chuàng)建多個副本因子。創(chuàng)建主題時,副本因子應(yīng)該小于等于可用的broker數(shù)。
副本因子操作以分區(qū)為單位的。每個分區(qū)都有各自的主副本和從副本;
主副本叫做leader,從副本叫做 follower(在有多個副本的情況下,kafka會為同一個分區(qū)下的所有分區(qū),設(shè)定角色關(guān)系:一個leader和N個 follower),處于同步狀態(tài)的副本叫做in-sync-replicas(ISR);
follower通過拉的方式從leader同步數(shù)據(jù)。消費(fèi)者和生產(chǎn)者都是從leader讀寫數(shù)據(jù),不與follower交互。
副本因子的作用:讓kafka讀取數(shù)據(jù)和寫入數(shù)據(jù)時的可靠性。
副本因子是包含本身,同一個副本因子不能放在同一個broker中。
如果某一個分區(qū)有三個副本因子,就算其中一個掛掉,那么只會剩下的兩個中,選擇一個leader,但不會在其他的broker中,另啟動一個副本(因?yàn)樵诹硪慌_啟動的話,存在數(shù)據(jù)傳遞,只要在機(jī)器之間有數(shù)據(jù)傳遞,就會長時間占用網(wǎng)絡(luò)IO,kafka是一個高吞吐量的消息系統(tǒng),這個情況不允許發(fā)生)所以不會在另一個broker中啟動。
如果所有的副本都掛了,生產(chǎn)者如果生產(chǎn)數(shù)據(jù)到指定分區(qū)的話,將寫入不成功。
lsr表示:當(dāng)前可用的副本。
一個partition當(dāng)中由多個segment文件組成,每個segment文件,包含兩部分,一個是 .log 文件,另外一個是 .index 文件,其中 .log 文件包含了我們發(fā)送的數(shù)據(jù)存儲,.index 文件,記錄的是我們.log文件的數(shù)據(jù)索引值,以便于我們加快數(shù)據(jù)的查詢速度。
索引文件與數(shù)據(jù)文件的關(guān)系
既然它們是一一對應(yīng)成對出現(xiàn),必然有關(guān)系。索引文件中元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址。
比如索引文件中 3,497 代表:數(shù)據(jù)文件中的第三個message,它的偏移地址為497。
再來看數(shù)據(jù)文件中,Message 368772表示:在全局partiton中是第368772個message。
注:segment index file 采取稀疏索引存儲方式,減少索引文件大小,通過mmap(內(nèi)存映射)可以直接內(nèi)存操作,稀疏索引為數(shù)據(jù)文件的每個對應(yīng)message設(shè)置一個元數(shù)據(jù)指針,它比稠密索引節(jié)省了更多的存儲空間,但查找起來需要消耗更多的時間。
.index 與 .log 對應(yīng)關(guān)系如下:
上圖左半部分是索引文件,里面存儲的是一對一對的key-value,其中key是消息在數(shù)據(jù)文件(對應(yīng)的log文件)中的編號,比如“1,3,6,8……”,分別表示在log文件中的第1條消息、第3條消息、第6條消息、第8條消息……
那么為什么在index文件中這些編號不是連續(xù)的呢?這是因?yàn)閕ndex文件中并沒有為數(shù)據(jù)文件中的每條消息都建立索引,而是采用了稀疏存儲的方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引。
這樣避免了索引文件占用過多的空間,從而可以將索引文件保留在內(nèi)存中。但缺點(diǎn)是沒有建立索引的Message也不能一次定位到其在數(shù)據(jù)文件的位置,從而需要做一次順序掃描,但是這次順序掃描的范圍就很小了。
value代表的是在全局partiton中的第幾個消息。
以索引文件中元數(shù)據(jù) 3,497 為例,其中3代表在右邊log數(shù)據(jù)文件中從上到下第3個消息,497表示該消息的物理偏移地址(位置)為497(也表示在全局partiton表示第497個消息-順序?qū)懭胩匦?。
log日志目錄及組成kafka在我們指定的log.dir目錄下,會創(chuàng)建一些文件夾;名字是 (主題名字-分區(qū)名) 所組成的文件夾。在(主題名字-分區(qū)名)的目錄下,會有兩個文件存在,如下所示:
#索引文件
00000000000000000000.index
#日志內(nèi)容
00000000000000000000.log
在目錄下的文件,會根據(jù)log日志的大小進(jìn)行切分,.log文件的大小為1G的時候,就會進(jìn)行切分文件;如下:
-rw-r--r--. 1 root root 389k 1月 17 18:03 00000000000000000000.index
-rw-r--r--. 1 root root 1.0G 1月 17 18:03 00000000000000000000.log
-rw-r--r--. 1 root root 10M 1月 17 18:03 00000000000000077894.index
-rw-r--r--. 1 root root 127M 1月 17 18:03 00000000000000077894.log
在kafka的設(shè)計(jì)中,將offset值作為了文件名的一部分。
segment文件命名規(guī)則:partion全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個全局partion的最大offset(偏移message數(shù))。數(shù)值最大為64位long大小,20位數(shù)字字符長度,沒有數(shù)字就用0填充。
通過索引信息可以快速定位到message。通過index元數(shù)據(jù)全部映射到內(nèi)存,可以避免segment File的IO磁盤操作;
通過索引文件稀疏存儲,可以大幅降低index文件元數(shù)據(jù)占用空間大小。
稀疏索引:為了數(shù)據(jù)創(chuàng)建索引,但范圍并不是為每一條創(chuàng)建,而是為某一個區(qū)間創(chuàng)建;好處:就是可以減少索引值的數(shù)量。不好的地方:找到索引區(qū)間之后,要得進(jìn)行第二次處理。
生產(chǎn)者發(fā)送到kafka的每條消息,都被kafka包裝成了一個message
message的物理結(jié)構(gòu)如下圖所示:
所以生產(chǎn)者發(fā)送給kafka的消息并不是直接存儲起來,而是經(jīng)過kafka的包裝,每條消息都是上圖這個結(jié)構(gòu),只有最后一個字段才是真正生產(chǎn)者發(fā)送的消息數(shù)據(jù)。
創(chuàng)建一個名字為test的主題, 有三個分區(qū),有兩個副本:
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 3 --topic test
查看kafka當(dāng)中存在的主題:
bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181
模擬生產(chǎn)者來生產(chǎn)數(shù)據(jù):
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
執(zhí)行以下命令來模擬消費(fèi)者進(jìn)行消費(fèi)數(shù)據(jù):
bin/kafka-console-consumer.sh --from-beginning --topic test --zookeeper node01:2181,node02:2181,node03:2181
執(zhí)行以下命令運(yùn)行describe查看topic的相關(guān)信息:
bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test
結(jié)果說明:
這是輸出的解釋。第一行給出了所有分區(qū)的摘要,每個附加行提供有關(guān)一個分區(qū)的信息。由于我們只有一個分 區(qū)用于此主題,因此只有一行。
“l(fā)eader”是負(fù)責(zé)給定分區(qū)的所有讀取和寫入的節(jié)點(diǎn)。每個節(jié)點(diǎn)將成為隨機(jī)選擇的分區(qū)部分的領(lǐng)導(dǎo)者。(因?yàn)樵趉afka中 如果有多個副本的話,就會存在leader和follower的關(guān)系,表示當(dāng)前這個副本為leader所在的broker是哪一個)
“replicas”是復(fù)制此分區(qū)日志的節(jié)點(diǎn)列表,無論它們是否為領(lǐng)導(dǎo)者,或者即使它們當(dāng)前處于活動狀態(tài)。(所有副本列表0,1,2)
“isr”是“同步”復(fù)制品的集合。這是副本列表的子集,該列表當(dāng)前處于活躍狀態(tài)并且已經(jīng)被領(lǐng)導(dǎo)者捕獲。(可用的列表數(shù))
執(zhí)行以下命令可以增加topic分區(qū)數(shù):
bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8
動態(tài)修改kakfa的配置:
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1
動態(tài)刪除kafka集群配置:
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages
目前刪除topic在默認(rèn)情況下知識打上一個刪除的標(biāo)記,在重新啟動kafka后才刪除。
如果需要立即刪除,則需要在server.properties中配置:
delete.topic.enable=true
然后執(zhí)行以下命令進(jìn)行刪除topic:
kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName
使用生產(chǎn)者,生產(chǎn)數(shù)據(jù):
/**
* 訂單的生產(chǎn)者代碼,
*/
public class OrderProducer {
public static void main(String[] args) throws InterruptedException {
/* 1、連接集群,通過配置文件的方式
* 2、發(fā)送數(shù)據(jù)-topic:order,value
*/
Properties props = new Properties(); props.put('bootstrap.servers', 'node01:9092'); props.put('acks', 'all');
props.put('retries', 0);
props.put('batch.size', 16384);
props.put('linger.ms', 1);
props.put('buffer.memory', 33554432);
props.put('key.serializer',
'org.apache.kafka.common.serialization.StringSerializer');
props.put('value.serializer',
'org.apache.kafka.common.serialization.StringSerializer');
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>
(props);
for (int i = 0; i < 1000; i++) {
// 發(fā)送數(shù)據(jù) ,需要一個producerRecord對象,最少參數(shù) String topic, V value kafkaProducer.send(new ProducerRecord<String, String>('order', '訂單信
息!'+i));
Thread.sleep(100);
}
}
}
kafka當(dāng)中的數(shù)據(jù)分區(qū):
kafka生產(chǎn)者發(fā)送的消息,都是保存在broker當(dāng)中,我們可以自定義分區(qū)規(guī)則,決定消息發(fā)送到哪個partition里面去進(jìn)行保存查看ProducerRecord這個類的源碼,就可以看到kafka的各種不同分區(qū)策略
kafka當(dāng)中支持以下四種數(shù)據(jù)的分區(qū)方式:
//第一種分區(qū)策略,如果既沒有指定分區(qū)號,也沒有指定數(shù)據(jù)key,那么就會使用輪詢的方式將數(shù)據(jù)均勻的發(fā)送到不同的分區(qū)里面去
//ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>('mypartition', 'mymessage' + i);
//kafkaProducer.send(producerRecord1);
//第二種分區(qū)策略 如果沒有指定分區(qū)號,指定了數(shù)據(jù)key,通過key.hashCode % numPartitions來計(jì)算數(shù)據(jù)究竟會保存在哪一個分區(qū)里面
//注意:如果數(shù)據(jù)key,沒有變化 key.hashCode % numPartitions = 固定值 所有的數(shù)據(jù)都會寫入到某一個分區(qū)里面去
//ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>('mypartition', 'mykey', 'mymessage' + i);
//kafkaProducer.send(producerRecord2);
//第三種分區(qū)策略:如果指定了分區(qū)號,那么就會將數(shù)據(jù)直接寫入到對應(yīng)的分區(qū)里面去
// ProducerRecord<String, String> producerRecord3 = new ProducerRecord<>('mypartition', 0, 'mykey', 'mymessage' + i);
// kafkaProducer.send(producerRecord3);
//第四種分區(qū)策略:自定義分區(qū)策略。如果不自定義分區(qū)規(guī)則,那么會將數(shù)據(jù)使用輪詢的方式均勻的發(fā)送到各個分區(qū)里面去
kafkaProducer.send(new ProducerRecord<String, String>('mypartition','mymessage'+i));
自定義分區(qū)策略:
public class KafkaCustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int partitionNum = partitions.size();
Random random = new Random();
int partition = random.nextInt(partitionNum);
return partition;
}
@Override
public void close() {
}
}
主代碼中添加配置:
@Test
public void kafkaProducer() throws Exception {
//1、準(zhǔn)備配置文件
Properties props = new Properties();
props.put('bootstrap.servers', 'node01:9092,node02:9092,node03:9092');
props.put('acks', 'all');
props.put('retries', 0);
props.put('batch.size', 16384);
props.put('linger.ms', 1);
props.put('buffer.memory', 33554432);
props.put('partitioner.class', 'cn.itcast.kafka.partitioner.KafkaCustomPartitioner');
props.put('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
props.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
//2、創(chuàng)建KafkaProducer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i=0;i<100;i++){
//3、發(fā)送數(shù)據(jù)
kafkaProducer.send(new ProducerRecord<String, String>('testpart','0','value'+i));
}
kafkaProducer.close();
}
消費(fèi)必要條件:
消費(fèi)者要從kafka Cluster進(jìn)行消費(fèi)數(shù)據(jù),必要條件有以下四個:
地址:bootstrap.servers=node01:9092
序列化:key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
主題(topic):需要制定具體的某個topic(order)即可。
消費(fèi)者組:group.id=test
消費(fèi)完成之后,自動提交offset:
/**
* 消費(fèi)訂單數(shù)據(jù)--- javaben.tojson
*/
public class OrderConsumer {
public static void main(String[] args) {
// 1\連接集群
Properties props = new Properties(); props.put('bootstrap.servers', 'hadoop-01:9092'); props.put('group.id', 'test');
//以下兩行代碼 ---消費(fèi)者自動提交offset值
props.put('enable.auto.commit', 'true');
props.put('auto.commit.interval.ms', '1000');
props.put('key.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');
props.put('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>
(props);
// 2、發(fā)送數(shù)據(jù) 發(fā)送數(shù)據(jù)需要,訂閱下要消費(fèi)的topic。order kafkaConsumer.subscribe(Arrays.asList('order'));
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);// jdk queue offer插入、poll獲取元素。blockingqueue put插入原生, take獲取元素
for (ConsumerRecord<String, String> record : consumerRecords) { System.out.println('消費(fèi)的數(shù)據(jù)為:' + record.value());
}
}
}
}
如果Consumer在獲取數(shù)據(jù)后,需要加入處理,數(shù)據(jù)完畢后才確認(rèn)offset,需要程序來控制offset的確認(rèn)。
關(guān)閉自動提交確認(rèn)選項(xiàng):props.put('enable.auto.commit', 'false');
手動提交offset值:kafkaConsumer.commitSync();
完整代碼如下:
Properties props = new Properties();
props.put('bootstrap.servers', 'localhost:9092');
props.put('group.id', 'test');
//關(guān)閉自動提交確認(rèn)選項(xiàng)
props.put('enable.auto.commit', 'false');
props.put('key.deserializer',
'org.apache.kafka.common.serialization.StringDeserializer');
props.put('value.deserializer',
'org.apache.kafka.common.serialization.StringDeserializer');
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList('test'));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
// 手動提交offset值
consumer.commitSync();
buffer.clear();
}
}
上面的示例使用commitSync將所有已接收的記錄標(biāo)記為已提交。在某些情況下,可能希望通過明確指定偏移量來更好地控制已提交的記錄。在下面的示例中,我們在完成處理每個分區(qū)中的記錄后提交偏移量:
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ': ' + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() -1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally { consumer.close();}
注意事項(xiàng):
提交的偏移量應(yīng)始終是應(yīng)用程序?qū)⒆x取的下一條消息的偏移量。因此,在調(diào)用commitSync(偏移量)時,應(yīng)該在最后處理的消息的偏移量中添加一個。
如果進(jìn)程正在維護(hù)與該分區(qū)關(guān)聯(lián)的某種本地狀態(tài)(如本地磁盤上的鍵值存儲),那么它應(yīng)該只獲取它在磁盤上維護(hù)的分區(qū)的記錄。
如果進(jìn)程本身具有高可用性,并且如果失敗則將重新啟動(可能使用YARN,Mesos或AWS工具等集群管理框 架,或作為流處理框架的一部分)。在這種情況下,Kafka不需要檢測故障并重新分配分區(qū),因?yàn)橄倪^程將在另一臺機(jī)器上重新啟動。
Properties props = new Properties(); props.put('bootstrap.servers', 'localhost:9092'); props.put('group.id', 'test');
props.put('enable.auto.commit', 'true');
props.put('auto.commit.interval.ms', '1000');
props.put('key.deserializer',
'org.apache.kafka.common.serialization.StringDeserializer');
props.put('value.deserializer',
'org.apache.kafka.common.serialization.StringDeserializer');
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//consumer.subscribe(Arrays.asList('foo', 'bar'));
//手動指定消費(fèi)指定分區(qū)的數(shù)據(jù)---start
String topic = 'foo';
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
//手動指定消費(fèi)指定分區(qū)的數(shù)據(jù)---end
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf('offset = %d, key = %s, value = %s%n', record.offset(), record.key(), record.value());
}
注意事項(xiàng):
要使用此模式,只需使用要使用的分區(qū)的完整列表調(diào)用assign(Collection),而不是使用subscribe訂閱主題。
主題與分區(qū)訂閱只能二選一。
說明:
已經(jīng)消費(fèi)的數(shù)據(jù)對于kafka來說,會將消費(fèi)組里面的o?set值進(jìn)行修改,那什么時候進(jìn)行修改了?是在數(shù)據(jù)消費(fèi) 完成之后,比如在控制臺打印完后自動提交;
提交過程:是通過kafka將o?set進(jìn)行移動到下個message所處的o?set的位置。
拿到數(shù)據(jù)后,存儲到hbase中或者mysql中,如果hbase或者mysql在這個時候連接不上,就會拋出異常,如果在處理數(shù)據(jù)的時候已經(jīng)進(jìn)行了提交,那么kafka傷的o?set值已經(jīng)進(jìn)行了修改了,但是hbase或者mysql中沒有數(shù)據(jù),這個時候就會出現(xiàn)數(shù)據(jù)丟失。
如果在處理代碼中正常處理了,但是在提交o?set請求的時候,沒有連接到kafka或者出現(xiàn)了故障,那么該次修 改o?set的請求是失敗的,那么下次在進(jìn)行讀取同一個分區(qū)中的數(shù)據(jù)時,會從已經(jīng)處理掉的o?set值再進(jìn)行處理一 次,那么在hbase中或者mysql中就會產(chǎn)生兩條一樣的數(shù)據(jù),也就是數(shù)據(jù)重復(fù)。
流程描述:
Consumer連接指定的Topic partition所在leader broker,采用pull方式從kafkalogs中獲取消息。對于不同的消費(fèi)模式,會將offset保存在不同的地方官網(wǎng)關(guān)于high level API 以及l(fā)ow level API的簡介:http://kafka.apache.org/0100/documentation.html#impl_consumer
高階API(High Level API):
kafka消費(fèi)者高階API簡單;隱藏Consumer與Broker細(xì)節(jié);相關(guān)信息保存在zookeeper中:
/* create a connection to the cluster */
ConsumerConnector connector = Consumer.create(consumerConfig);
interface ConsumerConnector {
/**
This method is used to get a list of KafkaStreams, which are iterators over
MessageAndMetadata objects from which you can obtain messages and their
associated metadata (currently only topic).
Input: a map of <topic, #streams>
Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
/**
You can also obtain a list of KafkaStreams, that iterate over messages
from topics that match a TopicFilter. (A TopicFilter encapsulates a
whitelist or a blacklist which is a standard Java regex.)
*/
public List<KafkaStream> createMessageStreamsByFilter( TopicFilter topicFilter, int numStreams);
/* Commit the offsets of all messages consumed so far. */ public commitOffsets()
/* Shut down the connector */ public shutdown()
}
說明:大部分的操作都已經(jīng)封裝好了,比如:當(dāng)前消費(fèi)到哪個位置下了,但是不夠靈活(工作過程推薦使用)
低級API(Low Level API):
kafka消費(fèi)者低級API非常靈活;需要自己負(fù)責(zé)維護(hù)連接Controller Broker。保存offset,Consumer Partition對應(yīng)關(guān)系:
class SimpleConsumer {
/* Send fetch request to a broker and get back a set of messages. */
public ByteBufferMessageSet fetch(FetchRequest request);
/* Send a list of fetch requests to a broker and get back a response set. */ public MultiFetchResponse multifetch(List<FetchRequest> fetches);
/**
Get a list of valid offsets (up to maxSize) before the given time.
The result is a list of offsets, in descending order.
@param time: time in millisecs,
if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest
offset available. if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest
available. public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
* offset
*/
說明:沒有進(jìn)行包裝,所有的操作有用戶決定,如自己的保存某一個分區(qū)下的記錄,你當(dāng)前消費(fèi)到哪個位置。
需求:使用StreamAPI獲取test這個topic當(dāng)中的數(shù)據(jù),然后將數(shù)據(jù)全部轉(zhuǎn)為大寫,寫入到test2這個topic當(dāng)中去。
第一步:創(chuàng)建一個topic
node01服務(wù)器使用以下命令來常見一個 topic 名稱為test2:
bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test2 --zookeeper node01:2181,node02:2181,node03:2181
第二步:開發(fā)StreamAPI
public class StreamAPI {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, 'wordcount-application');
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'node01:9092');
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
builder.stream('test').mapValues(line -> line.toString().toUpperCase()).to('test2');
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}
執(zhí)行上述代碼,監(jiān)聽獲取 test中的數(shù)據(jù),然后轉(zhuǎn)成大寫,將結(jié)果寫入 test2。
第三步:生產(chǎn)數(shù)據(jù)
node01執(zhí)行以下命令,向test這個topic當(dāng)中生產(chǎn)數(shù)據(jù):
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
第四步:消費(fèi)數(shù)據(jù)
node02執(zhí)行一下命令消費(fèi)test2這個topic當(dāng)中的數(shù)據(jù):
bin/kafka-console-consumer.sh --from-beginning --topic test2 --zookeeper node01:2181,node02:2181,node03:2181
生產(chǎn)者發(fā)送給kafka數(shù)據(jù),可以采用同步方式或異步方式
同步方式:
發(fā)送一批數(shù)據(jù)給kafka后,等待kafka返回結(jié)果:
異步方式:
發(fā)送一批數(shù)據(jù)給kafka,只是提供一個回調(diào)函數(shù):
注:如果broker遲遲不給ack,而buffer又滿了,開發(fā)者可以設(shè)置是否直接清空buffer中的數(shù)據(jù)。
生產(chǎn)者數(shù)據(jù)發(fā)送出去,需要服務(wù)端返回一個確認(rèn)碼,即ack響應(yīng)碼;ack的響應(yīng)有三個狀態(tài)值0,1,-1
0:生產(chǎn)者只負(fù)責(zé)發(fā)送數(shù)據(jù),不關(guān)心數(shù)據(jù)是否丟失,丟失的數(shù)據(jù),需要再次發(fā)送
1:partition的leader收到數(shù)據(jù),不管follow是否同步完數(shù)據(jù),響應(yīng)的狀態(tài)碼為1
-1:所有的從節(jié)點(diǎn)都收到數(shù)據(jù),響應(yīng)的狀態(tài)碼為-1
如果broker端一直不返回ack狀態(tài),producer永遠(yuǎn)不知道是否成功;producer可以設(shè)置一個超時時間10s,超過時間認(rèn)為失敗。
在broker中,保證數(shù)據(jù)不丟失主要是通過副本因子(冗余),防止數(shù)據(jù)丟失。
在消費(fèi)者消費(fèi)數(shù)據(jù)的時候,只要每個消費(fèi)者記錄好offset值即可,就能保證數(shù)據(jù)不丟失。也就是需要我們自己維護(hù)偏移量(offset),可保存在 Redis 中。
Server.properties配置文件說明:
#broker的全局唯一編號,不能重復(fù)
broker.id=0
#用來監(jiān)聽鏈接的端口,producer或consumer將在此端口建立連接
port=9092
#處理網(wǎng)絡(luò)請求的線程數(shù)量
num.network.threads=3
#用來處理磁盤IO的線程數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接受套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka運(yùn)行日志存放的路徑
log.dirs=/export/data/kafka/
#topic在當(dāng)前broker上的分片個數(shù)
num.partitions=2
#用來恢復(fù)和清理data下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168
#滾動生成新的segment文件的最大時間
log.roll.hours=1
#日志文件中每個segment的大小,默認(rèn)為1G
log.segment.bytes=1073741824
#周期性檢查文件大小的時間
log.retention.check.interval.ms=300000
#日志清理是否打開
log.cleaner.enable=true
#broker需要使用zookeeper保存meta數(shù)據(jù)
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
#zookeeper鏈接超時時間
zookeeper.connection.timeout.ms=6000
#partion buffer中,消息的條數(shù)達(dá)到閾值,將觸發(fā)flush到磁盤
log.flush.interval.messages=10000
#消息buffer的時間,達(dá)到閾值,將觸發(fā)flush到磁盤
log.flush.interval.ms=3000
#刪除topic需要server.properties中設(shè)置delete.topic.enable=true否則只是標(biāo)記刪除
delete.topic.enable=true
#此處的host.name為本機(jī)IP(重要),如果不改,則客戶端會拋出:Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=kafka01
advertised.host.name=192.168.140.128
producer生產(chǎn)者配置文件說明
#指定kafka節(jié)點(diǎn)列表,用于獲取metadata,不必全部指定
metadata.broker.list=node01:9092,node02:9092,node03:9092
# 指定分區(qū)處理類。默認(rèn)kafka.producer.DefaultPartitioner,表通過key哈希到對應(yīng)分區(qū)
#partitioner.class=kafka.producer.DefaultPartitioner
# 是否壓縮,默認(rèn)0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮后消息中會有頭來指明消息壓縮類型,故在消費(fèi)者端消息解壓是透明的無需指定。
compression.codec=none
# 指定序列化處理類
serializer.class=kafka.serializer.DefaultEncoder
# 如果要壓縮消息,這里指定哪些topic要壓縮消息,默認(rèn)empty,表示不壓縮。
#compressed.topics=
# 設(shè)置發(fā)送數(shù)據(jù)是否需要服務(wù)端的反饋,有三個值0,1,-1
# 0: producer不會等待broker發(fā)送ack
# 1: 當(dāng)leader接收到消息之后發(fā)送ack
# -1: 當(dāng)所有的follower都同步消息成功后發(fā)送ack.
request.required.acks=0
# 在向producer發(fā)送ack之前,broker允許等待的最大時間 ,如果超時,broker將會向producer發(fā)送一個error ACK.意味著上一次消息因?yàn)槟撤N原因未能成功(比如follower未能同步成功)
request.timeout.ms=10000
# 同步還是異步發(fā)送消息,默認(rèn)“sync”表同步,'async'表異步。異步可以提高發(fā)送吞吐量,
也意味著消息將會在本地buffer中,并適時批量發(fā)送,但是也可能導(dǎo)致丟失未發(fā)送過去的消息
producer.type=sync
# 在async模式下,當(dāng)message被緩存的時間超過此值后,將會批量發(fā)送給broker,默認(rèn)為5000ms
# 此值和batch.num.messages協(xié)同工作.
queue.buffering.max.ms = 5000
# 在async模式下,producer端允許buffer的最大消息量
# 無論如何,producer都無法盡快的將消息發(fā)送給broker,從而導(dǎo)致消息在producer端大量沉積
# 此時,如果消息的條數(shù)達(dá)到閥值,將會導(dǎo)致producer端阻塞或者消息被拋棄,默認(rèn)為10000
queue.buffering.max.messages=20000
# 如果是異步,指定每次批量發(fā)送數(shù)據(jù)量,默認(rèn)為200
batch.num.messages=500
# 當(dāng)消息在producer端沉積的條數(shù)達(dá)到'queue.buffering.max.meesages'后
# 阻塞一定時間后,隊(duì)列仍然沒有enqueue(producer仍然沒有發(fā)送出任何消息)
# 此時producer可以繼續(xù)阻塞或者將消息拋棄,此timeout值用于控制'阻塞'的時間
# -1: 無阻塞超時限制,消息不會被拋棄
# 0:立即清空隊(duì)列,消息被拋棄
queue.enqueue.timeout.ms=-1
# 當(dāng)producer接收到error ACK,或者沒有接收到ACK時,允許消息重發(fā)的次數(shù)
# 因?yàn)閎roker并沒有完整的機(jī)制來避免消息重復(fù),所以當(dāng)網(wǎng)絡(luò)異常時(比如ACK丟失)
# 有可能導(dǎo)致broker接收到重復(fù)的消息,默認(rèn)值為3.
message.send.max.retries=3
# producer刷新topic metada的時間間隔,producer需要知道partition leader的位置,以及當(dāng)前topic的情況
# 因此producer需要一個機(jī)制來獲取最新的metadata,當(dāng)producer遇到特定錯誤時,將會立即刷新
# (比如topic失效,partition丟失,leader失效等),此外也可以通過此參數(shù)來配置額外的刷新機(jī)制,默認(rèn)值600000
topic.metadata.refresh.interval.ms=60000
consumer消費(fèi)者配置詳細(xì)說明:
# zookeeper連接服務(wù)器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
# zookeeper的session過期時間,默認(rèn)5000ms,用于檢測消費(fèi)者是否掛掉
zookeeper.session.timeout.ms=5000
#當(dāng)消費(fèi)者掛掉,其他消費(fèi)者要等該指定時間才能檢查到并且觸發(fā)重新負(fù)載均衡
zookeeper.connection.timeout.ms=10000
# 指定多久消費(fèi)者更新offset到zookeeper中。注意offset更新時基于time而不是每次獲得的消息。一旦在更新zookeeper發(fā)生異常并重啟,將可能拿到已拿到過的消息
zookeeper.sync.time.ms=2000
#指定消費(fèi)
group.id=itcast
# 當(dāng)consumer消費(fèi)一定量的消息之后,將會自動向zookeeper提交offset信息
# 注意offset信息并不是每消費(fèi)一次消息就向zk提交一次,而是現(xiàn)在本地保存(內(nèi)存),并定期提交,默認(rèn)為true
auto.commit.enable=true
# 自動更新時間。默認(rèn)60 * 1000
auto.commit.interval.ms=1000
# 當(dāng)前consumer的標(biāo)識,可以設(shè)定,也可以有系統(tǒng)生成,主要用來跟蹤消息消費(fèi)情況,便于觀察
conusmer.id=xxx
# 消費(fèi)者客戶端編號,用于區(qū)分不同客戶端,默認(rèn)客戶端程序自動產(chǎn)生
client.id=xxxx
# 最大取多少塊緩存到消費(fèi)者(默認(rèn)10)
queued.max.message.chunks=50
# 當(dāng)有新的consumer加入到group時,將會reblance,此后將會有partitions的消費(fèi)端遷移到新 的consumer上,如果一個consumer獲得了某個partition的消費(fèi)權(quán)限,那么它將會向zk注冊 'Partition Owner registry'節(jié)點(diǎn)信息,但是有可能此時舊的consumer尚沒有釋放此節(jié)點(diǎn), 此值用于控制,注冊節(jié)點(diǎn)的重試次數(shù).
rebalance.max.retries=5
# 獲取消息的最大尺寸,broker不會像consumer輸出大于此值的消息chunk 每次feth將得到多條消息,此值為總大小,提升此值,將會消耗更多的consumer端內(nèi)存
fetch.min.bytes=6553600
# 當(dāng)消息的尺寸不足時,server阻塞的時間,如果超時,消息將立即發(fā)送給consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
# 如果zookeeper沒有offset值或offset值超出范圍。那么就給個初始的offset。有smallest、largest、anything可選,分別表示給當(dāng)前最小的offset、當(dāng)前最大的offset、拋異常。默認(rèn)largest
auto.offset.reset=smallest
# 指定序列化處理類
derializer.class=kafka.serializer.DefaultDecoder
分布式系統(tǒng)(distributed system)正變得越來越重要,大型網(wǎng)站幾乎都是分布式的。
分布式系統(tǒng)的最大難點(diǎn),就是各個節(jié)點(diǎn)的狀態(tài)如何同步。
為了解決各個節(jié)點(diǎn)之間的狀態(tài)同步問題,在1998年,由加州大學(xué)的計(jì)算機(jī)科學(xué)家 Eric Brewer 提出分布式系統(tǒng)的三個指標(biāo),分別是:
Consistency:一致性
Availability:可用性
Partition tolerance:分區(qū)容錯性
Eric Brewer 說,這三個指標(biāo)不可能同時做到。最多只能同時滿足其中兩個條件,這個結(jié)論就叫做 CAP 定理。
CAP理論是指:分布式系統(tǒng)中,一致性、可用性和分區(qū)容忍性最多只能同時滿足兩個。
一致性:Consistency
可用性:Availability
分區(qū)容錯性:Partition tolerance
一般而言,都要求保證分區(qū)容忍性。所以在CAP理論下,更多的是需要在可用性和一致性之間做權(quán)衡。
先看 Partition tolerance,中文叫做'分區(qū)容錯'。
大多數(shù)分布式系統(tǒng)都分布在多個子網(wǎng)絡(luò)。每個子網(wǎng)絡(luò)就叫做一個區(qū)(partition)。分區(qū)容錯的意思是,區(qū)間通信可能失敗。比如,一臺服務(wù)器放在中國,另一臺服務(wù)器放在美國,這就是兩個區(qū),它們之間可能無法通信。
上圖中,G1 和 G2 是兩臺跨區(qū)的服務(wù)器。G1 向 G2 發(fā)送一條消息,G2 可能無法收到。系統(tǒng)設(shè)計(jì)的時候,必須考慮到這種情況。
一般來說,分區(qū)容錯無法避免,因此可以認(rèn)為 CAP 的 P 總是存在的。即永遠(yuǎn)可能存在分區(qū)容錯這個問題
Consistency 中文叫做'一致性'。意思是,寫操作之后的讀操作,必須返回該值。舉例來說,某條記錄是 v0,用戶向 G1 發(fā)起一個寫操作,將其改為 v1。
為了讓 G2 也能變?yōu)?v1,就要在 G1 寫操作的時候,讓 G1 向 G2 發(fā)送一條消息,要求 G2 也改成 v1。
這樣的話,用戶向 G2 發(fā)起讀操作,也能得到 v1。
Availability 中文叫做'可用性',意思是只要收到用戶的請求,服務(wù)器就必須給出回應(yīng)。用戶可以選擇向 G1 或 G2 發(fā)起讀操作。不管是哪臺服務(wù)器,只要收到請求,就必須告訴用戶,到底是 v0 還是 v1,否則就不滿足可用性。
kafka是一個分布式的消息隊(duì)列系統(tǒng),既然是一個分布式的系統(tǒng),那么就一定滿足CAP定律,那么在kafka當(dāng)中是如何遵循CAP定律的呢?kafka滿足CAP定律當(dāng)中的哪兩個呢?
kafka滿足的是CAP定律當(dāng)中的CA,其中Partition tolerance通過的是一定的機(jī)制盡量的保證分區(qū)容錯性。
其中C表示的是數(shù)據(jù)一致性。A表示數(shù)據(jù)可用性。
kafka首先將數(shù)據(jù)寫入到不同的分區(qū)里面去,每個分區(qū)又可能有好多個副本,數(shù)據(jù)首先寫入到leader分區(qū)里面去,讀寫的操作都是與leader分區(qū)進(jìn)行通信,保證了數(shù)據(jù)的一致性原則,也就是滿足了Consistency原則。
然后kafka通過分區(qū)副本機(jī)制,來保證了kafka當(dāng)中數(shù)據(jù)的可用性。但是也存在另外一個問題,就是副本分區(qū)當(dāng)中的數(shù)據(jù)與leader當(dāng)中的數(shù)據(jù)存在差別的問題如何解決,這個就是Partition tolerance的問題。
kafka為了解決Partition tolerance的問題,使用了ISR的同步策略,來盡最大可能減少Partition tolerance的問題。
每個leader會維護(hù)一個ISR(a set of in-sync replicas,基本同步)列表。
ISR列表主要的作用就是決定哪些副本分區(qū)是可用的,也就是說可以將leader分區(qū)里面的數(shù)據(jù)同步到副本分區(qū)里面去,決定一個副本分區(qū)是否可用的條件有兩個:
replica.lag.time.max.ms=10000 副本分區(qū)與主分區(qū)心跳時間延遲
replica.lag.max.messages=4000 副本分區(qū)與主分區(qū)消息同步最大差
produce 請求被認(rèn)為完成時的確認(rèn)值:request.required.acks=0
。
在開發(fā)工作中,消費(fèi)在Kafka集群中消息,數(shù)據(jù)變化是我們關(guān)注的問題,當(dāng)業(yè)務(wù)前提不復(fù)雜時,我們可以使用Kafka 命令提供帶有Zookeeper客戶端工具的工具,可以輕松完成我們的工作。隨著業(yè)務(wù)的復(fù)雜性,增加Group和 Topic,那么我們使用Kafka提供命令工具,已經(jīng)感到無能為力,那么Kafka監(jiān)控系統(tǒng)目前尤為重要,我們需要觀察 消費(fèi)者應(yīng)用的細(xì)節(jié)。
為了簡化開發(fā)者和服務(wù)工程師維護(hù)Kafka集群的工作有一個監(jiān)控管理工具,叫做 Kafka-eagle。這個管理工具可以很容易地發(fā)現(xiàn)分布在集群中的哪些topic分布不均勻,或者是分區(qū)在整個集群分布不均勻的的情況。它支持管理多個集群、選擇副本、副本重新分配以及創(chuàng)建Topic。同時,這個管理工具也是一個非常好的可以快速瀏覽這個集群的工具,
需要安裝jdk,啟動zk以及kafka的服務(wù)
kafka-eagle官網(wǎng):http://download.kafka-eagle.org/
我們可以從官網(wǎng)上面直接下載最細(xì)的安裝包即可kafka-eagle-bin-1.3.2.tar.gz這個版本即可
代碼托管地址:
https://github.com/smartloli/kafka-eagle/releases
這里我們選擇將kafak-eagle安裝在第三臺。
直接將kafka-eagle安裝包上傳到node03服務(wù)器的/export/softwares路徑下,然后進(jìn)行解壓node03服務(wù)器執(zhí)行一下命令進(jìn)行解壓。
kafka-eagle需要使用一個數(shù)據(jù)庫來保存一些元數(shù)據(jù)信息,我們這里直接使用msyql數(shù)據(jù)庫來保存即可,在node03服務(wù)器執(zhí)行以下命令創(chuàng)建一個mysql數(shù)據(jù)庫即可。
進(jìn)入mysql客戶端:
create database eagle;
修改kafak-eagle配置文件
執(zhí)行以下命令修改kafak-eagle配置文件:
vim system-config.properties
修改為如下:
kafka.eagle.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=node01:2181,node02:2181,node03:2181
cluster2.zk.list=node01:2181,node02:2181,node03:2181
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node03:3306/eagle
kafka.eagle.username=root
kafka.eagle.password=123456
配置環(huán)境變量
kafka-eagle必須配置環(huán)境變量,node03服務(wù)器執(zhí)行以下命令來進(jìn)行配置環(huán)境變量: vim /etc/profile
:
export KE_HOME=/opt//kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2
export PATH=:$KE_HOME/bin:$PATH
修改立即生效,執(zhí)行: source /etc/profile
執(zhí)行以下界面啟動kafka-eagle:
cd kafka-eagle-web-1.3.2/bin
chmod u+x ke.sh
./ke.sh start
主界面
訪問kafka-eagle
http://node03:8048/ke/account/signin?/ke/
用戶名:admin
密碼:123456
緩沖和削峰:上游數(shù)據(jù)時有突發(fā)流量,下游可能扛不住,或者下游沒有足夠多的機(jī)器來保證冗余,kafka在中間可以起到一個緩沖的作用,把消息暫存在kafka中,下游服務(wù)就可以按照自己的節(jié)奏進(jìn)行慢慢處理。
解耦和擴(kuò)展性:項(xiàng)目開始的時候,并不能確定具體需求。消息隊(duì)列可以作為一個接口層,解耦重要的業(yè)務(wù)流程。只需要遵守約定,針對數(shù)據(jù)編程即可獲取擴(kuò)展能力。
冗余:可以采用一對多的方式,一個生產(chǎn)者發(fā)布消息,可以被多個訂閱topic的服務(wù)消費(fèi)到,供多個毫無關(guān)聯(lián)的業(yè)務(wù)使用。
健壯性:消息隊(duì)列可以堆積請求,所以消費(fèi)端業(yè)務(wù)即使短時間死掉,也不會影響主要業(yè)務(wù)的正常進(jìn)行。
異步通信:很多時候,用戶不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許用戶把一個消息放入隊(duì)列,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少,然后在需要的時候再去處理它們。
kafka消費(fèi)消息的offset是定義在zookeeper中的,如果想重復(fù)消費(fèi)kafka的消息,可以在redis中自己記錄offset的checkpoint點(diǎn)(n個),當(dāng)想重復(fù)消費(fèi)消息時,通過讀取redis中的checkpoint點(diǎn)進(jìn)行zookeeper的offset重設(shè),這樣就可以達(dá)到重復(fù)消費(fèi)消息的目的了
kafka使用的是磁盤存儲。
速度快是因?yàn)椋?/p>
注:
分三個點(diǎn)說,一個是生產(chǎn)者端,一個消費(fèi)者端,一個broker端。
kafka的ack機(jī)制:在kafka發(fā)送數(shù)據(jù)的時候,每次發(fā)送消息都會有一個確認(rèn)反饋機(jī)制,確保消息正常的能夠被收到,其中狀態(tài)有0,1,-1。
如果是同步模式:
ack設(shè)置為0,風(fēng)險(xiǎn)很大,一般不建議設(shè)置為0。即使設(shè)置為1,也會隨著leader宕機(jī)丟失數(shù)據(jù)。所以如果要嚴(yán)格保證生產(chǎn)端數(shù)據(jù)不丟失,可設(shè)置為-1。
如果是異步模式:
也會考慮ack的狀態(tài),除此之外,異步模式下的有個buffer,通過buffer來進(jìn)行控制數(shù)據(jù)的發(fā)送,有兩個值來進(jìn)行控制,時間閾值與消息的數(shù)量閾值,如果buffer滿了數(shù)據(jù)還沒有發(fā)送出去,有個選項(xiàng)是配置是否立即清空buffer??梢栽O(shè)置為-1,永久阻塞,也就數(shù)據(jù)不再生產(chǎn)。異步模式下,即使設(shè)置為-1。也可能因?yàn)槌绦騿T的不科學(xué)操作,操作數(shù)據(jù)丟失,比如kill -9,但這是特別的例外情況。
注:
ack=0:producer不等待broker同步完成的確認(rèn),繼續(xù)發(fā)送下一條(批)信息。
ack=1(默認(rèn)):producer要等待leader成功收到數(shù)據(jù)并得到確認(rèn),才發(fā)送下一條message。
ack=-1:producer得到follwer確認(rèn),才發(fā)送下一條數(shù)據(jù)。
通過offset commit 來保證數(shù)據(jù)的不丟失,kafka自己記錄了每次消費(fèi)的offset數(shù)值,下次繼續(xù)消費(fèi)的時候,會接著上次的offset進(jìn)行消費(fèi)。
而offset的信息在kafka0.8版本之前保存在zookeeper中,在0.8版本之后保存到topic中,即使消費(fèi)者在運(yùn)行過程中掛掉了,再次啟動的時候會找到offset的值,找到之前消費(fèi)消息的位置,接著消費(fèi),由于 offset 的信息寫入的時候并不是每條消息消費(fèi)完成后都寫入的,所以這種情況有可能會造成重復(fù)消費(fèi),但是不會丟失消息。
唯一例外的情況是,我們在程序中給原本做不同功能的兩個consumer組設(shè)置KafkaSpoutConfig.bulider.setGroupid的時候設(shè)置成了一樣的groupid,這種情況會導(dǎo)致這兩個組共享同一份數(shù)據(jù),就會產(chǎn)生組A消費(fèi)partition1,partition2中的消息,組B消費(fèi)partition3的消息,這樣每個組消費(fèi)的消息都會丟失,都是不完整的。為了保證每個組都獨(dú)享一份消息數(shù)據(jù),groupid一定不要重復(fù)才行。
每個broker中的partition我們一般都會設(shè)置有replication(副本)的個數(shù),生產(chǎn)者寫入的時候首先根據(jù)分發(fā)策略(有partition按partition,有key按key,都沒有輪詢)寫入到leader中,follower(副本)再跟leader同步數(shù)據(jù),這樣有了備份,也可以保證消息數(shù)據(jù)的不丟失。
采集層 主要可以使用Flume, Kafka等技術(shù)。
Flume:Flume 是管道流方式,提供了很多的默認(rèn)實(shí)現(xiàn),讓用戶通過參數(shù)部署,及擴(kuò)展API.
Kafka:Kafka是一個可持久化的分布式的消息隊(duì)列。Kafka 是一個非常通用的系統(tǒng)。你可以有許多生產(chǎn)者和很多的消費(fèi)者共享多個主題Topics。
相比之下,Flume是一個專用工具被設(shè)計(jì)為旨在往HDFS,HBase發(fā)送數(shù)據(jù)。它對HDFS有特殊的優(yōu)化,并且集成了Hadoop的安全特性。
所以,Cloudera 建議如果數(shù)據(jù)被多個系統(tǒng)消費(fèi)的話,使用kafka;如果數(shù)據(jù)被設(shè)計(jì)給Hadoop使用,使用Flume。
kafka 宕機(jī)了,首先我們考慮的問題應(yīng)該是所提供的服務(wù)是否因?yàn)殄礄C(jī)的機(jī)器而受到影響,如果服務(wù)提供沒問題,如果實(shí)現(xiàn)做好了集群的容災(zāi)機(jī)制,那么這塊就不用擔(dān)心了。
想要恢復(fù)集群的節(jié)點(diǎn),主要的步驟就是通過日志分析來查看節(jié)點(diǎn)宕機(jī)的原因,從而解決,重新恢復(fù)節(jié)點(diǎn)。
在 Kafka 中,生產(chǎn)者寫入消息、消費(fèi)者讀取消息的操作都是與 leader 副本進(jìn)行交互的,從 而實(shí)現(xiàn)的是一種主寫主讀的生產(chǎn)消費(fèi)模型。Kafka 并不支持主寫從讀,因?yàn)橹鲗憦淖x有 2 個很明顯的缺點(diǎn):
數(shù)據(jù)一致性問題:數(shù)據(jù)從主節(jié)點(diǎn)轉(zhuǎn)到從節(jié)點(diǎn)必然會有一個延時的時間窗口,這個時間 窗口會導(dǎo)致主從節(jié)點(diǎn)之間的數(shù)據(jù)不一致。某一時刻,在主節(jié)點(diǎn)和從節(jié)點(diǎn)中 A 數(shù)據(jù)的值都為 X, 之后將主節(jié)點(diǎn)中 A 的值修改為 Y,那么在這個變更通知到從節(jié)點(diǎn)之前,應(yīng)用讀取從節(jié)點(diǎn)中的 A 數(shù)據(jù)的值并不為最新的 Y,由此便產(chǎn)生了數(shù)據(jù)不一致的問題。
延時問題:類似 Redis 這種組件,數(shù)據(jù)從寫入主節(jié)點(diǎn)到同步至從節(jié)點(diǎn)中的過程需要經(jīng)歷 網(wǎng)絡(luò)→主節(jié)點(diǎn)內(nèi)存→網(wǎng)絡(luò)→從節(jié)點(diǎn)內(nèi)存 這幾個階段,整個過程會耗費(fèi)一定的時間。而在 Kafka 中,主從同步會比 Redis 更加耗時,它需要經(jīng)歷 網(wǎng)絡(luò)→主節(jié)點(diǎn)內(nèi)存→主節(jié)點(diǎn)磁盤→網(wǎng)絡(luò)→從節(jié) 點(diǎn)內(nèi)存→從節(jié)點(diǎn)磁盤 這幾個階段。對延時敏感的應(yīng)用而言,主寫從讀的功能并不太適用。
而kafka的主寫主讀的優(yōu)點(diǎn)就很多了:
每個分區(qū)只能由同一個消費(fèi)組內(nèi)的一個消費(fèi)者(consumer)來消費(fèi),可以由不同的消費(fèi)組的消費(fèi)者來消費(fèi),同組的消費(fèi)者則起到并發(fā)的效果。
連接ZK集群,從ZK中拿到對應(yīng)topic的partition信息和partition的Leader的相關(guān)信息
連接到對應(yīng)Leader對應(yīng)的broker
consumer將?自?己保存的offset發(fā)送給Leader
Leader根據(jù)offset等信息定位到segment(索引?文件和?日志?文件)
根據(jù)索引?文件中的內(nèi)容,定位到?日志?文件中該偏移量量對應(yīng)的開始位置讀取相應(yīng)?長度的數(shù)據(jù)并返回給consumer
kafka只能保證partition內(nèi)是有序的,但是partition間的有序是沒辦法的。愛奇藝的搜索架構(gòu),是從業(yè)務(wù)上把需要有序的打到同?個partition。
如果是Kafka消費(fèi)能力不足,則可以考慮增加Topic的分區(qū)數(shù),并且同時提升消費(fèi)組的消費(fèi)者數(shù)量,消費(fèi)者數(shù)=分區(qū)數(shù)。(兩者缺一不可)
如果是下游的數(shù)據(jù)處理不及時:提高每批次拉取的數(shù)量。批次拉取數(shù)據(jù)過少(拉取數(shù)據(jù)/處理時間<生產(chǎn)速度),使處理的數(shù)據(jù)小于生產(chǎn)的數(shù)據(jù),也會造成數(shù)據(jù)積壓。
kafka對于消息體的大小默認(rèn)為單條最大值是1M但是在我們應(yīng)用場景中, 常常會出現(xiàn)一條消息大于1M,如果不對kafka進(jìn)行配置。則會出現(xiàn)生產(chǎn)者無法將消息推送到kafka或消費(fèi)者無法去消費(fèi)kafka里面的數(shù)據(jù), 這時我們就要對kafka進(jìn)行以下配置:server.properties
replica.fetch.max.bytes: 1048576 broker可復(fù)制的消息的最大字節(jié)數(shù), 默認(rèn)為1M
message.max.bytes: 1000012 kafka 會接收單個消息size的最大限制, 默認(rèn)為1M左右
注意:message.max.bytes必須小于等于replica.fetch.max.bytes,否則就會導(dǎo)致replica之間數(shù)據(jù)同步失敗。