分布式架構(gòu)、大數(shù)據(jù)架構(gòu)在軟件開發(fā)中使用頻率越來越高,然而在解決分布式數(shù)據(jù)一致性上,Zookeeper是最為成熟穩(wěn)定且被大規(guī)模應(yīng)用的解決方案,無論從性能、易用性還是穩(wěn)定性上來說,Zookeeper都已經(jīng)達到了一個工業(yè)級產(chǎn)品的標(biāo)準(zhǔn)。
Zookeeper是由Hadoop的子項目發(fā)展而來,于2010年11月正式成為了Apache的頂級項目。Zookeeper為分布式應(yīng)用提供了高效且可靠的分布式協(xié)調(diào)訪問,是一個典型的分布式數(shù)據(jù)一致性的解決方案,分布式應(yīng)用程序可以基于其實現(xiàn)諸如數(shù)據(jù)發(fā)布/訂閱、負(fù)載均衡、分布式協(xié)調(diào)/通知、集群管理、分布式鎖和分布式隊列等功能。
在解決分布式數(shù)據(jù)一致性方面,Zookeeper采用了一種被成為ZAB(Zookeeper Atomic Broadcast)的一致性協(xié)議。Zookeeper的設(shè)計目標(biāo)是將那些復(fù)雜且容易出錯的分布式一致性服務(wù)封裝起來,構(gòu)成一個高效可靠的原語集,并以一些列簡單易用的接口提供給用戶使用。
Zookeeper可以保證如下分布式一致性特性:
? 順序一致性:從同一個客戶端發(fā)起的事務(wù)請求,最終將會嚴(yán)格地按照其發(fā)起順序被應(yīng)用到Zookeeper中去;
? 原子性:所有事務(wù)請求的處理結(jié)果在整個集群中的所有機器上的應(yīng)用情況是一致的,不會出現(xiàn)集群中部分機器應(yīng)用了該事務(wù),而另外一部分沒有應(yīng)用的情況;
? 單一視圖:無論客戶端連接的是哪個Zookeeper服務(wù)器,其看到的服務(wù)端數(shù)據(jù)模型都是一致的;
? 可靠性:一旦服務(wù)端成功應(yīng)用了一個事務(wù),并完成對客戶端的訪問,那么該事務(wù)所引起的服務(wù)端狀態(tài)變更將會被一直保留下來,除非有另一個事務(wù)又對其進行了變更;
? 實時性:Zookeeper僅僅保證在一定的時間段內(nèi),客戶端最終一定能夠從服務(wù)端上讀取到最新的數(shù)據(jù)狀態(tài)
分布式系統(tǒng)就像一個動物園,混亂且難以管理,而ZooKeeper就是將這一切變得可控,由此理解,Zookeeper這個名字起的還是比較貼切的。Zookeeper作為一個輕量實用的工具,在Hadoop、HBase、Kafka、Dubbo等產(chǎn)品中都得以應(yīng)用,可謂是大數(shù)據(jù)、分布式領(lǐng)域的一把利器。
下面將會從源碼層次研究分析Zookeeper,通過源碼幫助我們深入理解Zookeeper實現(xiàn)思路,并提高我們對分布式一致性問題的認(rèn)識。
下圖是使用Sonar掃描Zookeeper源碼的大小情況統(tǒng)計信息:
從上圖統(tǒng)計信息可以看出Zookeeper代碼行數(shù)不到3萬行,有效代碼行大致只有1萬4千行,涉及到的類有382個,源碼文件210個,總體來看代碼量偏少,比較適合源碼研究分析。
上圖就是基于Zookeeper源碼繪制的運行的總體流程:
? 1、Zookeeper啟動類是QuorumPeerMain,并將配置文件通過參數(shù)方式傳入:
1 | QuorumPeerMain.main(new String[]{'D:\\Tools\\Zookeeper\\zookeeper-3.4.9\\conf\\zoo1.cfg'}); |
? 2、然后將傳入的配置文件進行解析獲取到QuorumPeerConfig配置類
? 3、然后啟動DatadirCleanupManager線程,由于Zookeeper的任何一個變更操作(增、刪、改)都將在transaction log中進行記錄,因為內(nèi)存中的數(shù)據(jù)掉電后會丟失,必須寫入到硬盤上的transaction log中;當(dāng)寫操作達到一定量或者一定時間間隔后,會對內(nèi)存中的數(shù)據(jù)進行一次快照并寫入到硬盤上的snap log中,主要為了縮短啟動時加載數(shù)據(jù)的時間從而加快系統(tǒng)啟動,另一方面避免transaction log日志數(shù)量過度膨脹。隨著運行時間的增長生成的transaction log和snapshot將越來越多,所以要定期清理,DatadirCleanupManager就是啟動一個TimeTask定時任務(wù)用于清理DataDir中的snapshot及對應(yīng)的transaction log
1 2 3 | DatadirCleanupManager主要有兩個參數(shù): snapRetainCount:清理后保留的snapshot的個數(shù),對應(yīng)配置:autopurge.snapRetainCount,大于等于3,默認(rèn)3 purgeInterval:清理任務(wù)TimeTask執(zhí)行周期,即幾個小時清理一次,對應(yīng)配置:autopurge.purgeInterval,單位:小時 |
? 4、根據(jù)配置中的servers數(shù)量判斷是集群環(huán)境還是單機環(huán)境,如果單機環(huán)境以standalone模式運行直接調(diào)用ZooKeeperServerMain.main()方法,這里就不細說,生產(chǎn)環(huán)境下主要是利用Zookeeper的集群環(huán)境,下面也主要是分析Zookeeper的集群環(huán)境下運行流程
1 2 3 4 | #如果zoo.cfg中配置如下信息,代碼中servers就會大于0,就會以集群環(huán)境運行;否則以standalone模式運行 server.1=127.0.0.1:2887:3887 server.2=127.0.0.1:2888:3888 server.3=127.0.0.1:2889:3889 |
? 5、創(chuàng)建ServerCnxnFactory實例,ServerCnxnFactory從名字就可以看出其是一個工廠類,負(fù)責(zé)管理ServerCnxn,ServerCnxn這個類代表了一個客戶端與一個server的連接,每個客戶端連接過來都會被封裝成一個ServerCnxn實例用來維護了服務(wù)器與客戶端之間的Socket通道。首先要有監(jiān)聽端口,客戶端連接才能過來,ServerCnxnFactory.configure()方法的核心就是啟動監(jiān)聽端口供客戶端連接進來,端口號由配置文件中clientPort屬性進行配置,默認(rèn)是2181:
1 2 | #zoo.cfg clientPort=2181 |
? 6、初始化QuorumPeer,Quorum在Zookeeper中代表集群中大多數(shù)節(jié)點的意思,即一半以上節(jié)點,Peer是端、節(jié)點的意思,Zookeeper集群中一半以上的節(jié)點其實就可以代表整個集群的狀態(tài),QuorumPeer就是管理維護的整個集群的一個核心類,這一步主要是創(chuàng)建一個QuorumPeer實例,并進行各種初始化工作,大致代碼如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | quorumPeer = getQuorumPeer();//創(chuàng)建QuorumPeer實例 quorumPeer.setQuorumPeers(config.getServers()); quorumPeer.setTxnFactory(new FileTxnSnapLog(//FileTxnSnapLog主要用于snap和transaction log的IO工具類 new File(config.getDataDir()), new File(config.getDataLogDir()))); quorumPeer.setElectionType(config.getElectionAlg());//選舉類型,用于確定選舉算法 quorumPeer.setMyid(config.getServerId());//myid用于區(qū)分不同端 quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); quorumPeer.setCnxnFactory(cnxnFactory);//ServerCnxnFactory客戶端請求管理工廠類 quorumPeer.setQuorumVerifier(config.getQuorumVerifier()); quorumPeer.setClientPortAddress(config.getClientPortAddress()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));//ZKDatabase維護ZK在內(nèi)存中的數(shù)據(jù)結(jié)構(gòu) quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); |
? 7、QuorumPeer初始化完成后執(zhí)行:
1 2 | quorumPeer.start(); quorumPeer.join(); |
QuorumPeer.start()是Zookeeper中非常重要的一個方法入口,其代碼如下:
1 2 3 4 5 6 | public synchronized void start() { loadDataBase();//從事務(wù)日志目錄dataLogDir和數(shù)據(jù)快照目錄dataDir中恢復(fù)出DataTree數(shù)據(jù) cnxnFactory.start(); //開啟對客戶端的連接端口,啟動ServerCnxnFactory主線程 startLeaderElection();//創(chuàng)建出選舉算法 super.start();//啟動QuorumPeer線程,在該線程中進行服務(wù)器狀態(tài)的檢查 } |
start方法實現(xiàn)的業(yè)務(wù)主要包含四個方面:
? 1、loadDataBase:涉及到的核心類是ZKDatabase,并借助于FileTxnSnapLog工具類將snap和transaction log反序列化到內(nèi)存中,最終構(gòu)建出內(nèi)存數(shù)據(jù)結(jié)構(gòu)DataTree
? 2、cnxnFactory.start:之前介紹過ServerCnxnFactory作用,ServerCnxnFactory本身也可以作為一個線程,其run方法實現(xiàn)的大致邏輯是:構(gòu)建reactor模型的EventLoop,Selector每隔1秒執(zhí)行一次select方法來處理IO請求,并分發(fā)到對應(yīng)的代表該客戶端的ServerCnxn中并利用doIO進行處理。其核心代碼我簡化如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | while (!ss.socket().isClosed()) { selector.select(1000); Set<SelectionKey> selected; selected = selector.selectedKeys(); for (SelectionKey k : selected) { if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {//接收ACCEPT //新連接進入,創(chuàng)建一個ServerCnxn封裝用于維護該客戶端連接 } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { //客戶端有讀寫請求時,分發(fā)到代表相應(yīng)客戶端ServerCnxn中,使用doIO進行處理客戶端請求 NIOServerCnxn c = (NIOServerCnxn) k.attachment(); c.doIO(k); } } } |
? 3、startLeaderElection():這個主要是初始化一些Leader選舉工作,這部分的關(guān)鍵代碼在QuorumPeer.createElectionAlgorithm,大致如下:
1 2 3 4 5 6 7 8 | QuorumCnxManager qcm = createCnxnManager();//創(chuàng)建一個QuorumCnxManager實例 QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start();//Listener是一個線程,這里啟動Listener線程,主要啟動選舉監(jiān)聽端口并處理連接進來的Socket le = new FastLeaderElection(this, qcm);//選舉算法使用FastLeaderElection,存在好幾種算法實現(xiàn),但是其它集中算法實現(xiàn)都已經(jīng)慢慢廢棄 } else { LOG.error('Null listener when initializing cnx manager'); } |
Leader選舉涉及到節(jié)點間的網(wǎng)絡(luò)IO,QuorumCnxManager就是負(fù)責(zé)集群中各節(jié)點的網(wǎng)絡(luò)IO,QuorumCnxManager包含一個內(nèi)部類Listener,Listener是一個線程,這里啟動Listener線程,主要啟動選舉監(jiān)聽端口并處理連接進來的Socket;FastLeaderElection就是封裝了具體選舉算法的實現(xiàn)。
? 4、super.start():QuorumPeer本身也是一個線程,其繼承了Thread類,這里就是啟動QuorumPeer線程,就是執(zhí)行QuorumPeer.run方法,其偽代碼大致邏輯如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | while (running) { switch (getPeerState()) { case LOOKING: 當(dāng)前節(jié)點進入Leader選舉狀態(tài),執(zhí)行選舉分支流程 break; case OBSERVING: 當(dāng)前節(jié)點成為Observer角色時執(zhí)行分支流程 break; case FOLLOWING: 當(dāng)前節(jié)點成為Follower角色時執(zhí)行分支流程 break; case LEADING: 當(dāng)前節(jié)點成為Leader角色時執(zhí)行分支流程 break; } } |
QuorumPeer線程進入到一個無限循環(huán)模式,不停的通過getPeerState方法獲取當(dāng)前節(jié)點狀態(tài),然后執(zhí)行相應(yīng)的分支邏輯。大致流程可以簡單描述如下:
? a.首先系統(tǒng)剛啟動時serverState默認(rèn)是LOOKING,表示需要進行Leader選舉,這時進入Leader選舉狀態(tài)中,會調(diào)用FastLeaderElection.lookForLeader方法,lookForLeader方法內(nèi)部也包含了一個循環(huán)邏輯,直到選舉出Leader才會跳出lookForLeader方法,如果選舉出的Leader就是本節(jié)點,則將serverState=LEADING賦值,否則設(shè)置成FOLLOWING或OBSERVING
? b.然后,QuorumPeer.run進行下一輪次循環(huán),通過getPeerState獲取當(dāng)前serverState狀態(tài),如果是LEADING,則表示當(dāng)前節(jié)點當(dāng)選為LEADER,則進入Leader角色分支流程,執(zhí)行作為一個Leader該干的任務(wù);如果是FOLLOWING或OBSERVING,則進入Follower或Observer角色,并執(zhí)行其相應(yīng)的任務(wù)。注意:進入分支路程會一直阻塞在其分支中,直到角色轉(zhuǎn)變才會重新進行下一輪次循環(huán),比如Follower監(jiān)控到無法與Leader保持通信了,會將serverState賦值為LOOKING,跳出分支并進行下一輪次循環(huán),這時就會進入LOOKING分支中重新進行Leader選舉
到這里,已經(jīng)對Zookeeper的執(zhí)行流程有了一個簡單粗糙的分析,這一節(jié)主要是概覽性介紹,并沒有深入分析每個分支具體的實現(xiàn)細節(jié),主要是對Zookeeper運行的主體輪廓有個大體認(rèn)識。Zookeeper的實現(xiàn)細節(jié)還是有很多復(fù)雜性的,后面會通過專題的方式分析每個分支、每個流程實現(xiàn)邏輯及核心思想。