/data/host1
, /data/host2
, 這個(gè)路徑也可以理解為是 Znode 的 Name/data/host1
, 其值是一個(gè)字符串 "192.168.0.1"
Zookeeper集群是一個(gè)基于主從框架的高可用集群
每個(gè)服務(wù)器承擔(dān)如下三種角色中的一種
Leader一個(gè)Zookeeper集群同一時(shí)間只會(huì)有一個(gè)實(shí)際工作的Leader,它會(huì)發(fā)起并維護(hù)與各Follwer及Observer間的心跳。所有的寫操作必須要通過Leader完成再有Leader將寫操作廣播給其他服務(wù)器。
Follower一個(gè)Zookeeper集群可能同時(shí)存在多個(gè)Follower,它會(huì)響應(yīng)Leader的心跳。
follower可直接處理并返回客戶端的讀請(qǐng)求,同時(shí)會(huì)將寫請(qǐng)求轉(zhuǎn)發(fā)給Leader處理,并且負(fù)責(zé)在Leader處理寫請(qǐng)求時(shí)對(duì)請(qǐng)求進(jìn)行投票。
Observer角色與Follower類似,但是無投票權(quán)。
數(shù)據(jù)發(fā)布/訂閱系統(tǒng),需要發(fā)布者將數(shù)據(jù)發(fā)布到Zookeeper的節(jié)點(diǎn)上,供訂閱者進(jìn)行數(shù)據(jù)訂閱,進(jìn)而達(dá)到動(dòng)態(tài)獲取數(shù)據(jù)的目的,實(shí)現(xiàn)配置信息的集中式管理和數(shù)據(jù)的動(dòng)態(tài)更新。
? 發(fā)布/訂閱一般有兩種設(shè)計(jì)模式:推模式和拉模式,服務(wù)端主動(dòng)將數(shù)據(jù)更新發(fā)送給所有訂閱的客戶端稱為推模式;客戶端主動(dòng)請(qǐng)求獲取最新數(shù)據(jù)稱為拉模式.
Zookeeper采用了推拉相結(jié)合的模式,客戶端向服務(wù)端注冊(cè)自己需要關(guān)注的節(jié)點(diǎn),一旦該節(jié)點(diǎn)數(shù)據(jù)發(fā)生變更,那么服務(wù)端就會(huì)向相應(yīng)的客戶端推送Watcher事件通知,客戶端接收到此通知后,主動(dòng)到服務(wù)端獲取最新的數(shù)據(jù)。
命名服務(wù)是分步實(shí)現(xiàn)系統(tǒng)中較為常見的一類場(chǎng)景,分布式系統(tǒng)中,被命名的實(shí)體通??梢允羌褐械臋C(jī)器、提供的服務(wù)地址或遠(yuǎn)程對(duì)象等,通過命名服務(wù),客戶端可以根據(jù)指定名字來獲取資源的實(shí)體,在分布式環(huán)境中,上層應(yīng)用僅僅需要一個(gè)全局唯一的名字。Zookeeper可以實(shí)現(xiàn)一套分布式全局唯一ID的分配機(jī)制。
通過調(diào)用Zookeeper節(jié)點(diǎn)創(chuàng)建的API接口就可以創(chuàng)建一個(gè)順序節(jié)點(diǎn),并且在API返回值中會(huì)返回這個(gè)節(jié)點(diǎn)的完整名字,利用此特性,可以生成全局ID,其步驟如下
1. 客戶端根據(jù)任務(wù)類型,在指定類型的任務(wù)下通過調(diào)用接口創(chuàng)建一個(gè)順序節(jié)點(diǎn),如"job-"。
2. 創(chuàng)建完成后,會(huì)返回一個(gè)完整的節(jié)點(diǎn)名,如"job-00000001"。
3. 客戶端拼接type類型和返回值后,就可以作為全局唯一ID了,如"type2-job-00000001"。
Zookeeper中特有的Watcher注冊(cè)于異步通知機(jī)制,能夠很好地實(shí)現(xiàn)分布式環(huán)境下不同機(jī)器,甚至不同系統(tǒng)之間的協(xié)調(diào)與通知,從而實(shí)現(xiàn)對(duì)數(shù)據(jù)變更的實(shí)時(shí)處理。通常的做法是不同的客戶端都對(duì)Zookeeper上的同一個(gè)數(shù)據(jù)節(jié)點(diǎn)進(jìn)行Watcher注冊(cè),監(jiān)聽數(shù)據(jù)節(jié)點(diǎn)的變化(包括節(jié)點(diǎn)本身和子節(jié)點(diǎn)),若數(shù)據(jù)節(jié)點(diǎn)發(fā)生變化,那么所有訂閱的客戶端都能夠接收到相應(yīng)的Watcher通知,并作出相應(yīng)處理。
在絕大多數(shù)分布式系統(tǒng)中,系統(tǒng)機(jī)器間的通信無外乎心跳檢測(cè)、工作進(jìn)度匯報(bào)和系統(tǒng)調(diào)度。
?、?心跳檢測(cè),不同機(jī)器間需要檢測(cè)到彼此是否在正常運(yùn)行,可以使用Zookeeper實(shí)現(xiàn)機(jī)器間的心跳檢測(cè),基于其臨時(shí)節(jié)點(diǎn)特性(臨時(shí)節(jié)點(diǎn)的生存周期是客戶端會(huì)話,客戶端若當(dāng)即后,其臨時(shí)節(jié)點(diǎn)自然不再存在),可以讓不同機(jī)器都在Zookeeper的一個(gè)指定節(jié)點(diǎn)下創(chuàng)建臨時(shí)子節(jié)點(diǎn),不同的機(jī)器之間可以根據(jù)這個(gè)臨時(shí)子節(jié)點(diǎn)來判斷對(duì)應(yīng)的客戶端機(jī)器是否存活。通過Zookeeper可以大大減少系統(tǒng)耦合。
② 工作進(jìn)度匯報(bào),通常任務(wù)被分發(fā)到不同機(jī)器后,需要實(shí)時(shí)地將自己的任務(wù)執(zhí)行進(jìn)度匯報(bào)給分發(fā)系統(tǒng),可以在Zookeeper上選擇一個(gè)節(jié)點(diǎn),每個(gè)任務(wù)客戶端都在這個(gè)節(jié)點(diǎn)下面創(chuàng)建臨時(shí)子節(jié)點(diǎn),這樣不僅可以判斷機(jī)器是否存活,同時(shí)各個(gè)機(jī)器可以將自己的任務(wù)執(zhí)行進(jìn)度寫到該臨時(shí)節(jié)點(diǎn)中去,以便中心系統(tǒng)能夠?qū)崟r(shí)獲取任務(wù)的執(zhí)行進(jìn)度。
?、?系統(tǒng)調(diào)度,Zookeeper能夠?qū)崿F(xiàn)如下系統(tǒng)調(diào)度模式:分布式系統(tǒng)由控制臺(tái)和一些客戶端系統(tǒng)兩部分構(gòu)成,控制臺(tái)的職責(zé)就是需要將一些指令信息發(fā)送給所有的客戶端,以控制他們進(jìn)行相應(yīng)的業(yè)務(wù)邏輯,后臺(tái)管理人員在控制臺(tái)上做一些操作,實(shí)際上就是修改Zookeeper上某些節(jié)點(diǎn)的數(shù)據(jù),Zookeeper可以把數(shù)據(jù)變更以時(shí)間通知的形式發(fā)送給訂閱客戶端。
分布式鎖用于控制分布式系統(tǒng)之間同步訪問共享資源的一種方式,可以保證不同系統(tǒng)訪問一個(gè)或一組資源時(shí)的一致性,主要分為排它鎖和共享鎖。
排它鎖又稱為寫鎖或獨(dú)占鎖,若事務(wù)T1對(duì)數(shù)據(jù)對(duì)象O1加上了排它鎖,那么在整個(gè)加鎖期間,只允許事務(wù)T1對(duì)O1進(jìn)行讀取和更新操作,其他任何事務(wù)都不能再對(duì)這個(gè)數(shù)據(jù)對(duì)象進(jìn)行任何類型的操作,直到T1釋放了排它鎖。
① 獲取鎖,在需要獲取排它鎖時(shí),所有客戶端通過調(diào)用接口,在/exclusive_lock節(jié)點(diǎn)下創(chuàng)建臨時(shí)子節(jié)點(diǎn)/exclusive_lock/lock。Zookeeper可以保證只有一個(gè)客戶端能夠創(chuàng)建成功,沒有成功的客戶端需要注冊(cè)/exclusive_lock節(jié)點(diǎn)監(jiān)聽。
?、?釋放鎖,當(dāng)獲取鎖的客戶端宕機(jī)或者正常完成業(yè)務(wù)邏輯都會(huì)導(dǎo)致臨時(shí)節(jié)點(diǎn)的刪除,此時(shí),所有在/exclusive_lock節(jié)點(diǎn)上注冊(cè)監(jiān)聽的客戶端都會(huì)收到通知,可以重新發(fā)起分布式鎖獲取。
共享鎖又稱為讀鎖,若事務(wù)T1對(duì)數(shù)據(jù)對(duì)象O1加上共享鎖,那么當(dāng)前事務(wù)只能對(duì)O1進(jìn)行讀取操作,其他事務(wù)也只能對(duì)這個(gè)數(shù)據(jù)對(duì)象加共享鎖,直到該數(shù)據(jù)對(duì)象上的所有共享鎖都被釋放。在需要獲取共享鎖時(shí),所有客戶端都會(huì)到/shared_lock下面創(chuàng)建一個(gè)臨時(shí)順序節(jié)點(diǎn)
有一些時(shí)候,多個(gè)團(tuán)隊(duì)需要共同完成一個(gè)任務(wù),比如,A團(tuán)隊(duì)將Hadoop集群計(jì)算的結(jié)果交給B團(tuán)隊(duì)繼續(xù)計(jì)算,B完成了自己任務(wù)再交給C團(tuán)隊(duì)繼續(xù)做。這就有點(diǎn)像業(yè)務(wù)系統(tǒng)的工作流一樣,一環(huán)一環(huán)地傳下 去.
分布式環(huán)境下,我們同樣需要一個(gè)類似單進(jìn)程隊(duì)列的組件,用來實(shí)現(xiàn)跨進(jìn)程、跨主機(jī)、跨網(wǎng)絡(luò)的數(shù)據(jù)共享和數(shù)據(jù)傳遞,這就是我們的分布式隊(duì)列。
Leader選舉是保證分布式數(shù)據(jù)一致性的關(guān)鍵所在。當(dāng)Zookeeper集群中的一臺(tái)服務(wù)器出現(xiàn)以下兩種情況之一時(shí),需要進(jìn)入Leader選舉。
若進(jìn)行Leader選舉,則至少需要兩臺(tái)機(jī)器,這里選取3臺(tái)機(jī)器組成的服務(wù)器集群為例。在集群初始化階段,當(dāng)有一臺(tái)服務(wù)器Server1啟動(dòng)時(shí),其單獨(dú)無法進(jìn)行和完成Leader選舉,當(dāng)?shù)诙_(tái)服務(wù)器Server2啟動(dòng)時(shí),此時(shí)兩臺(tái)機(jī)器可以相互通信,每臺(tái)機(jī)器都試圖找到Leader,于是進(jìn)入Leader選舉過程。選舉過程如下
(1) 每個(gè)Server發(fā)出一個(gè)投票。由于是初始情況,Server1和Server2都會(huì)將自己作為L(zhǎng)eader服務(wù)器來進(jìn)行投票,每次投票會(huì)包含所推舉的服務(wù)器的myid和ZXID,使用(myid, ZXID)來表示,此時(shí)Server1的投票為(1, 0),Server2的投票為(2, 0),然后各自將這個(gè)投票發(fā)給集群中其他機(jī)器。
(2) 接受來自各個(gè)服務(wù)器的投票。集群的每個(gè)服務(wù)器收到投票后,首先判斷該投票的有效性,如檢查是否是本輪投票、是否來自LOOKING狀態(tài)的服務(wù)器。
(3) 處理投票。針對(duì)每一個(gè)投票,服務(wù)器都需要將別人的投票和自己的投票進(jìn)行PK,PK規(guī)則如下
· 優(yōu)先檢查ZXID。ZXID比較大的服務(wù)器優(yōu)先作為L(zhǎng)eader。
· 如果ZXID相同,那么就比較myid。myid較大的服務(wù)器作為L(zhǎng)eader服務(wù)器。
對(duì)于Server1而言,它的投票是(1, 0),接收Server2的投票為(2, 0),首先會(huì)比較兩者的ZXID,均為0,再比較myid,此時(shí)Server2的myid最大,于是更新自己的投票為(2, 0),然后重新投票,對(duì)于Server2而言,其無須更新自己的投票,只是再次向集群中所有機(jī)器發(fā)出上一次投票信息即可。
(4) 統(tǒng)計(jì)投票。每次投票后,服務(wù)器都會(huì)統(tǒng)計(jì)投票信息,判斷是否已經(jīng)有過半機(jī)器接受到相同的投票信息,對(duì)于Server1、Server2而言,都統(tǒng)計(jì)出集群中已經(jīng)有兩臺(tái)機(jī)器接受了(2, 0)的投票信息,此時(shí)便認(rèn)為已經(jīng)選出了Leader。
(5) 改變服務(wù)器狀態(tài)。一旦確定了Leader,每個(gè)服務(wù)器就會(huì)更新自己的狀態(tài),如果是Follower,那么就變更為FOLLOWING,如果是Leader,就變更為L(zhǎng)EADING。
在Zookeeper運(yùn)行期間,Leader與非Leader服務(wù)器各司其職,即便當(dāng)有非Leader服務(wù)器宕機(jī)或新加入,此時(shí)也不會(huì)影響Leader,但是一旦Leader服務(wù)器掛了,那么整個(gè)集群將暫停對(duì)外服務(wù),進(jìn)入新一輪Leader選舉,其過程和啟動(dòng)時(shí)期的Leader選舉過程基本一致過程相同。
集群規(guī)劃
服務(wù)器IP | 主機(jī)名 | myid的值 |
---|---|---|
192.168.174.10 | hadoop01 | 1 |
192.168.174.11 | hadoop02 | 2 |
192.168.174.12 | hadoop03 | 3 |
第一步:下載zookeeeper的壓縮包,下載網(wǎng)址如下
我們?cè)谶@個(gè)網(wǎng)址下載我們使用的zk版本為3.4.9
下載完成之后,上傳到我們的linux的/root路徑下準(zhǔn)備進(jìn)行安裝
第二步:解壓
解壓zookeeper的壓縮包到/myapp/Zookeeper路徑下去,然后準(zhǔn)備進(jìn)行安裝
tar -zxf /root/zookeeper-3.4.9.tar.gz -C /myapp/Zookeeper/
第三步:修改配置文件
第一臺(tái)機(jī)器修改配置文件
cd /myapp/Zookeeper/zookeeper-3.4.9/conf/
cp zoo_sample.cfg zoo.cfg
mkdir -p /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/
vim zoo.cfg
dataDir=/myapp/Zookeeper/zookeeper-3.4.9/zkdatas/
# 保留多少個(gè)快照
autopurge.snapRetainCount=3
# 日志多少小時(shí)清理一次
autopurge.purgeInterval=1
# 集群中服務(wù)器地址
server.1=hadoop01:2888:3888
server.2=hadoop02:2888:3888
server.3=hadoop03:2888:3888
第四步:添加myid配置
在第一臺(tái)機(jī)器的
/export/servers/zookeeper-3.4.9/zkdatas /這個(gè)路徑下創(chuàng)建一個(gè)文件,文件名為myid ,文件內(nèi)容為1
echo 1 > /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/myid
第五步:安裝包分發(fā)并修改myid的值
安裝包分發(fā)到其他機(jī)器的前提是:其他主機(jī)也要有相應(yīng)的目錄。
第一臺(tái)機(jī)器上面執(zhí)行以下兩個(gè)命令
scp -r /myapp/Zookeeper/zookeeper-3.4.9/ node02:/myapp/Zookeeper/
scp -r /myapp/Zookeeper/zookeeper-3.4.9/ node03:/myapp/Zookeeper/
第二臺(tái)機(jī)器上修改myid的值為2
echo 2 > /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/myid
第三臺(tái)機(jī)器上修改myid的值為3
echo 3 > /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/myid
第六步:三臺(tái)機(jī)器啟動(dòng)zookeeper服務(wù)
三臺(tái)機(jī)器啟動(dòng)zookeeper服務(wù)
這個(gè)命令三臺(tái)機(jī)器都要執(zhí)行
/myapp/Zookeeper/zookeeper-3.4.9/bin/zkServer.sh start
查看啟動(dòng)狀態(tài)
/myapp/Zookeeper//zookeeper-3.4.9/bin/zkServer.sh status
#!/bin/bash
if [ -d /myapp/Zookeeper/zookeeper-3.4.9 ]; then
echo "Zookeeper已經(jīng)安裝"
else
echo "請(qǐng)輸入一共多少主機(jī)"
read rootsum
echo "請(qǐng)分別輸入主機(jī)名"
for ((i=1; i<=${rootsum}; i++)) {
echo "請(qǐng)輸入第${i}臺(tái)主機(jī)名:"
read rootname[i]
ssh ${rootname[i]} "mkdir -p /myapp/Zookeeper"
}
echo "請(qǐng)輸入Zookeeper壓縮包路徑"
read newfile
echo "正在為你安裝Zookeeper..........."
tar -zxf $newfile -C /myapp/Zookeeper/
cp /myapp/Zookeeper/zookeeper-3.4.9/conf/zoo_sample.cfg /myapp/Zookeeper/zookeeper-3.4.9/conf/zoo.cfg
sed -i -e '12d' /myapp/Zookeeper/zookeeper-3.4.9/conf/zoo.cfg
mkdir -p /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/
arr1=("dataDir=/myapp/Zookeeper/zookeeper-3.4.9/zkdatas" "# 保留多少個(gè)快照" "autopurge.snapRetainCount=3" "# 日志多少小時(shí)清理一次" "autopurge.purgeInterval=1" "# 集群中服務(wù)器地址" "server.1=hadoop01:2888:3888" "server.2=hadoop02:2888:3888" "server.3=hadoop03:2888:3888")
#echo ${#arr1[*]}
for (( i=0; i<=${#arr1[*]}; i++)) {
echo ${arr1[$i]} >> /myapp/Zookeeper/zookeeper-3.4.9/conf/zoo.cfg
}
echo 1 >> /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/myid
echo "==============================================================================="
echo "==============================================================================="
echo "==============================================================================="
for ((e=1; e<=${rootsum}; e++)){
if test $[ e + 1 ] -gt $rootsum; then
ssh ${rootname[e]} sed -i -e s/1/${e}/ /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/myid
else
scp -r /myapp/Zookeeper/zookeeper-3.4.9 ${rootname[e + 1]}:/myapp/Zookeeper
ssh ${rootname[e]} sed -i -e s/1/${e}/ /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/myid
fi
}
nl /myapp/Zookeeper/zookeeper-3.4.9/conf/zoo.cfg
for i in ${rootname[*]};
do
ssh $i "echo '#Zookeeper' >> /etc/profile"
ssh $i "echo 'export ZOOKEEPER_HOME=/myapp/Zookeeper/zookeeper-3.4.9' >> /etc/profile"
ssh $i "echo 'export PATH=\$PATH:\$ZOOKEEPER_HOME/bin' >> /etc/profile"
done
echo "==================================================================================="
echo "Zookeeper 安裝已完成"
fi
但也有不同之處:
9.1、
Zonde有兩種,分別為臨時(shí)節(jié)點(diǎn)和永久節(jié)點(diǎn)。節(jié)點(diǎn)的類型在創(chuàng)建時(shí)即被確定并且不能改變。
9.2、
Zonde還有一個(gè)序列化的特性,如果創(chuàng)建的時(shí)候指定的話,該Znode的名字后面會(huì)自動(dòng)追加一個(gè)不斷增加的序列號(hào)。序列號(hào)對(duì)于此節(jié)點(diǎn)的父節(jié)點(diǎn)來說是唯一的,這樣便會(huì)記錄每個(gè)子節(jié)點(diǎn)創(chuàng)建的先后順序。它的格式為"%10d"(10位數(shù)字,沒有數(shù)值的數(shù)位用0補(bǔ)充,例如"0000000001")
9.3、
這樣便會(huì)存在四種類型的Zonde節(jié)點(diǎn),分別對(duì)應(yīng):
bin/zkCli.sh -server hadoop01:2181
命令 | 說明 | 參數(shù) |
---|---|---|
create [-s] [-e] path data acl |
創(chuàng)建Znode | -s 指定是順序節(jié)點(diǎn) -e 指定是臨時(shí)節(jié)點(diǎn) |
ls path [watch] |
列出Path下所有子Znode | |
get path [watch] |
獲取Path對(duì)應(yīng)的Znode的數(shù)據(jù)和屬性 | |
ls2 path [watch] |
查看Path下所有子Znode以及子Znode的屬性 | |
set path data [version] |
更新節(jié)點(diǎn) | version 數(shù)據(jù)版本 |
delete path [version] |
刪除節(jié)點(diǎn), 如果要?jiǎng)h除的節(jié)點(diǎn)有子Znode則無法刪除 | version 數(shù)據(jù)版本 |
rmr path |
刪除節(jié)點(diǎn), 如果有子Znode則遞歸刪除 | |
setquota -n|-b val path |
修改Znode配額 | -n 設(shè)置子節(jié)點(diǎn)最大個(gè)數(shù) -b 設(shè)置節(jié)點(diǎn)數(shù)據(jù)最大長(zhǎng)度 |
history |
列出歷史記錄 |
create /app1 hello
create -s /app3 world
create -s /app3 world
create -s -e /tempnode2 aaa
get /app1
set /app1 xxx
delete /app1 刪除的節(jié)點(diǎn)不能有子節(jié)點(diǎn)
rmr /app1 遞歸刪除
Znode
Znode
, 需要使用路徑的形式, 例如 /test1/test11
Znode
中數(shù)據(jù)是有大小限制的, 最大只能為1M
Znode
是由三個(gè)部分構(gòu)成
stat
: 狀態(tài), Znode的權(quán)限信息, 版本等data
: 數(shù)據(jù), 每個(gè)Znode都是可以攜帶數(shù)據(jù)的, 無論是否有子節(jié)點(diǎn)children
: 子節(jié)點(diǎn)列表Znode
有兩大特性, 可以構(gòu)成四種不同類型的Znode
持久
客戶端斷開時(shí), 不會(huì)刪除持有的Znode臨時(shí)
客戶端斷開時(shí), 刪除所有持有的Znode, 臨時(shí)Znode不允許有子Znode有序
創(chuàng)建的Znode有先后順序, 順序就是在后面追加一個(gè)序列號(hào), 序列號(hào)是由父節(jié)點(diǎn)管理的自增無序
創(chuàng)建的Znode沒有先后順序每個(gè)Znode都包含一系列的屬性,通過命令get,可以獲得節(jié)點(diǎn)的屬性。
dataVersion
:數(shù)據(jù)版本, 每次當(dāng)Znode
中的數(shù)據(jù)發(fā)生變化的時(shí)候, dataVersion
都會(huì)增加1(即使設(shè)置的是相同的數(shù)據(jù)),可有效避免了數(shù)據(jù)更新時(shí)出現(xiàn)的先后順序問題。cversion
: 節(jié)點(diǎn)版本, 每次當(dāng)Znode
的節(jié)點(diǎn)發(fā)生變化的時(shí)候, cversion
都會(huì)增加1。aclVersion
:ACL(Access Control List)
的版本號(hào), 當(dāng)Znode
的權(quán)限信息發(fā)生變化的時(shí)候aclVersion會(huì)自增cZxid
:zonde創(chuàng)建的事務(wù)IDmZxid
:Zonde被修改的事務(wù)id,即每次對(duì)znode的修改都會(huì)更新mZxid。
ctime
:節(jié)點(diǎn)創(chuàng)建的時(shí)間戳。mtime
:節(jié)點(diǎn)最新 一次更新發(fā)生的時(shí)間戳。ephemeralOwner
:如果Znode
為臨時(shí)節(jié)點(diǎn), ephemeralOwner
表示與該節(jié)點(diǎn)關(guān)聯(lián)的SessionId
,如果不是臨時(shí)節(jié)點(diǎn)ephemeralOwner值為0。Session
中的, 客戶端和服務(wù)器創(chuàng)建一個(gè)連接的時(shí)候同時(shí)也會(huì)創(chuàng)建一個(gè)Session
Session
會(huì)在不同的狀態(tài)之間進(jìn)行切換: CONNECTING
, CONNECTED
, RECONNECTING
, RECONNECTED
, CLOSED
Watcher
, 當(dāng)Znode發(fā)生變化的時(shí)候, WatchManager
會(huì)調(diào)用對(duì)應(yīng)的Watcher
Watcher
會(huì)得到通知Watcher
的特點(diǎn)
Watcher
只會(huì)被觸發(fā)一次, 如果需要繼續(xù)監(jiān)聽, 則需要再次添加 Watcher
Watcher
得到的事件是被封裝過的, 包括三個(gè)內(nèi)容 keeperState, eventType, path
KeeperState | EventType | 觸發(fā)條件 | 說明 |
---|---|---|---|
None | 連接成功 | ||
SyncConnected | NodeCreated | Znode被創(chuàng)建 | 此時(shí)處于連接狀態(tài) |
SyncConnected | NodeDeleted | Znode被刪除 | 此時(shí)處于連接狀態(tài) |
SyncConnected | NodeDataChanged | Znode數(shù)據(jù)被改變 | 此時(shí)處于連接狀態(tài) |
SyncConnected | NodeChildChanged | Znode的子Znode數(shù)據(jù)被改變 | 此時(shí)處于連接狀態(tài) |
Disconnected | None | 客戶端和服務(wù)端斷開連接 | 此時(shí)客戶端和服務(wù)器處于斷開連接狀態(tài) |
Expired | None | 會(huì)話超時(shí) | 會(huì)收到一個(gè)SessionExpiredException |
AuthFailed | None | 權(quán)限驗(yàn)證失敗 | 會(huì)收到一個(gè)AuthFailedException |
這里操作Zookeeper的JavaAPI使用的是一套zookeeper客戶端框架Curator,解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作。
Curator包含了幾個(gè)包:
Maven依賴(使用curator的版本:2.12.0,對(duì)應(yīng)Zookeeper的版本為:3.4.x,如果跨版本會(huì)有兼容性問題,很有可能導(dǎo)致節(jié)點(diǎn)操作失?。?/p>
創(chuàng)建maven java工程,導(dǎo)入jar包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Zookeeper_JavaAPI</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.google.collect</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<!-- same version with the zookeeper-->
<version>3.4.9</version>
</dependency>
</dependencies>
</project>
package cn.itcast.zookeeper_api;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
public class ZookeeperAPITest {
@Test
public void createZnode() throws Exception {
//1. 定制一個(gè)重試策略
/*
param1:重試的間隔時(shí)間
param2:重試的最大次數(shù)
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2. 獲取一個(gè)客戶端對(duì)象
/*
param1:要連接的Zookeeper服務(wù)器列表
param2:會(huì)話的超時(shí)時(shí)間
param3:鏈接超時(shí)時(shí)間
param4:重試策略
*/
String connectionStr="192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
//3。開啟客戶端
client.start();
//4. 創(chuàng)建永久節(jié)點(diǎn)
/*
PERSISTENT:永久節(jié)點(diǎn)
PERSISTENT_SEQUENTIAL:永久序列化節(jié)點(diǎn)
EPHEMERAL:臨時(shí)節(jié)點(diǎn)
EPHEMERAL_SEQUENTIAL:臨時(shí)序列化節(jié)點(diǎn)
*/
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/world01","world".getBytes());
//5. 關(guān)閉客戶端
client.close();
}
}
package cn.itcast.zookeeper_api;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
public class ZookeeperAPITest {
@Test
public void createZnode() throws Exception {
//1. 定制一個(gè)重試策略
/*
param1:重試的間隔時(shí)間
param2:重試的最大次數(shù)
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2. 獲取一個(gè)客戶端對(duì)象
/*
param1:要連接的Zookeeper服務(wù)器列表
param2:會(huì)話的超時(shí)時(shí)間
param3:鏈接超時(shí)時(shí)間
param4:重試策略
*/
String connectionStr="192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
//3。開啟客戶端
client.start();
//4. 創(chuàng)建臨時(shí)節(jié)點(diǎn)
/*
PERSISTENT:永久節(jié)點(diǎn)
PERSISTENT_SEQUENTIAL:永久序列化節(jié)點(diǎn)
EPHEMERAL:臨時(shí)節(jié)點(diǎn)
EPHEMERAL_SEQUENTIAL:臨時(shí)序列化節(jié)點(diǎn)
*/
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/world01","world".getBytes());
Thread.sleep(5000);
//5. 關(guān)閉客戶端
client.close();
}
}
package cn.itcast.zookeeper_api;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
public class ZookeeperAPITest {
@Test
public void SetZnodeData() throws Exception {
//1. 定制一個(gè)重試策略
/*
param1:重試的間隔時(shí)間
param2:重試的最大次數(shù)
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2. 獲取一個(gè)客戶端對(duì)象
/*
param1:要連接的Zookeeper服務(wù)器列表
param2:會(huì)話的超時(shí)時(shí)間
param3:鏈接超時(shí)時(shí)間
param4:重試策略
*/
String connectionStr="192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
//3。開啟客戶端
client.start();
//4. 修改節(jié)點(diǎn)數(shù)據(jù)
client.setData().forPath("/hell04","hello7".getBytes());
//5. 關(guān)閉客戶端
client.close();
}
}
package cn.itcast.zookeeper_api;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
public class ZookeeperAPITest {
@Test
public void getZnodeData() throws Exception {
//1. 定制一個(gè)重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2. 獲取客戶端
String conectionStr="192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181,";
CuratorFramework client = CuratorFrameworkFactory.newClient(conectionStr,8000,8000,retryPolicy);
//3. 啟動(dòng)客戶端
client.start();
//4. 獲取節(jié)點(diǎn)數(shù)據(jù)
byte[] bytes = client.getData().forPath("/app1");
System.out.println(new String(bytes));
//5. 關(guān)閉客戶端
client.close();
}
}
package cn.itcast.zookeeper_api;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
public class ZookeeperAPITest {
@Test
public void WatchZnode() throws Exception {
//1. 定制一個(gè)重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2. 獲取客戶端
String conectionStr="192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181,";
CuratorFramework client = CuratorFrameworkFactory.newClient(conectionStr,8000,8000,retryPolicy);
//3. 啟動(dòng)客戶端
client.start();
//4. 創(chuàng)建一個(gè)TreeCache對(duì)象,指定要監(jiān)控的節(jié)點(diǎn)路徑
final TreeCache treeCache = new TreeCache(client, "/hell04");
//5. 自定義一個(gè)監(jiān)聽器
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
ChildData data = treeCacheEvent.getData();
if (data != null) {
switch (treeCacheEvent.getType()){
case NODE_ADDED:
System.out.println("監(jiān)控到有新增節(jié)點(diǎn)!");
break;
case NODE_REMOVED:
System.out.println("監(jiān)控到有節(jié)點(diǎn)被移除!");
break;
case NODE_UPDATED:
System.out.println("監(jiān)控到節(jié)點(diǎn)被更新!");
break;
default:
break;
}
}
}
});
//6. 開始監(jiān)聽
treeCache.start();
//7. 設(shè)置休眠時(shí)間
Thread.sleep(100000000);
//8. 關(guān)閉客戶端
client.close();
}
}
聯(lián)系客服