1. 簡介kafka (官網(wǎng)地址:
http://kafka.apache.org)是一款分布式消息發(fā)布和訂閱的系統(tǒng),具有高性能和高吞吐率。
i. 消息的發(fā)布(publish)稱作producer,消息的訂閱(subscribe)稱作consumer,中間的存儲陣列稱作broker。
ii. 多個broker協(xié)同合作,producer、consumer和broker三者之間通過zookeeper來協(xié)調(diào)請求和轉(zhuǎn)發(fā)。
iii. producer產(chǎn)生和推送(push)數(shù)據(jù)到broker,consumer從broker拉取(pull)數(shù)據(jù)并進(jìn)行處理。
iv. broker端不維護(hù)數(shù)據(jù)的消費(fèi)狀態(tài),提升了性能。
v. 直接使用磁盤進(jìn)行存儲,線性讀寫,速度快:避免了數(shù)據(jù)在JVM內(nèi)存和系統(tǒng)內(nèi)存之間的復(fù)制,減少耗性能的創(chuàng)建對象和垃圾回收。
vi. Kafka使用scala編寫,可以運(yùn)行在JVM上。
2. 安裝:a. 首先安裝JRE/JDK
Linux安裝JDKb. 下載kafka
進(jìn)入下載頁面:
http://kafka.apache.org/downloads.html選擇Binary downloads下載 (Source download需要編譯才能使用)
也可以直接在linux終端下載:
- wget -q http://apache.fayea.com/apache-mirror/kafka/0.8.1/kafka_2.8.0-0.8.1.tgz
c. 解壓
- tar -xzvf kafka_2.8.0-0.8.1.tgz
- rm kafka_2.8.0-0.8.1.tgz
- cd kafka_2.8.0-0.8.1
目錄:
/bin 啟動和停止命令等。
/config 配置文件
/libs 類庫
d. 修改配置
Kafka默認(rèn)開啟JVM壓縮指針,但只是在64位的HotSpot VM受支持,如果安裝了32位的HotSpot VM,需要修改
/bin/kafka-run-class.sh文件
- vi bin/kafka-run-class.sh
找到如下行:
- KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
去除
-XX:+UseCompressedOops參數(shù)
3. 啟動和停止啟動Zookeeper server:
- bin/zookeeper-server-start.sh config/zookeeper.properties &
&是為了能退出命令行
啟動Kafka server:
- bin/kafka-server-start.sh config/server.properties &
停止Kafka server
停止Zookeeper server:
- bin/zookeeper-server-stop.sh
4. 單機(jī)連通性測試運(yùn)行producer:
- bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
早版本的Kafka,--broker-list localhost:9092需改為--zookeeper localhost:2181
運(yùn)行consumer:
- bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
在producer端輸入字符串并回車,查看consumer端是否顯示。
5. 分布式連通性測試Zookeeper Server, Kafka Server, Producer都放在服務(wù)器server1上,ip地址為192.168.1.10
Consumer放在服務(wù)器server2上,ip地址為192.168.1.12。
分別運(yùn)行server1的producer和server2的consumer,
- bin/kafka-console-producer.sh --broker-list 192.168.1.10:9092 --topic test
- bin/kafka-console-consumer.sh --zookeeper 192.168.1.10:2181 --topic test --from-beginning
在producer的console端輸入字符串,consumer報
Connection refused錯誤:
broker, producer和consumer都注冊到zookeeper上,producer和consumer的參數(shù)明確指定。問題出在broker的配置文件server.properties上:
- # Hostname the broker will bind to. If not set, the server will bind to all interfaces
- #host.name=localhost
host名稱沒有指定,就是127.0.0.1,consumer去broker拿數(shù)據(jù)就有問題。設(shè)置為192.168.1.10,重啟服務(wù)就好了。
本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請
點(diǎn)擊舉報。