免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
分布式計算開源框架Hadoop入門實踐

 分布式計算開源框架Hadoop入門實踐收藏

 | 舊一篇: 休假

Author :岑文初

Email: wenchu.cenwc@alibaba-inc.com

msn: cenwenchu_79@hotmail.com

blog: http://blog.csdn.net/cenwenchu79/

 

... 2

What is Hadoop. 2

Why is hadoop. 6

How to Use Hadoop & Tips 7

環(huán)境:... 7

部署考慮:... 7

實施步驟:... 7

Hadoop Command. 10

Hadoop基本流程以及簡單應(yīng)用的開發(fā)... 11

基本流程:... 11

代碼范例:... 13

Hadoop集群測試... 18

隨想... 19

 


       SIP項目設(shè)計的過程中,對于它龐大的日志在早先就考慮使用任務(wù)分解的多線程處理模式來分析統(tǒng)計,在前面有一篇Blog中提到了那部分的設(shè)計,但是由于統(tǒng)計的內(nèi)容暫時還是十分簡單,所以就采用Memcache作為計數(shù)器結(jié)合Mysql完成了訪問控制以及統(tǒng)計的工作。但未來,對于海量日志分析的工作,還是需要有所準備?,F(xiàn)在最火的技術(shù)詞匯莫過于“云計算”,在Open API日益盛行的今天,互聯(lián)網(wǎng)應(yīng)用的數(shù)據(jù)將會越來越有價值,如何去分析這些數(shù)據(jù),挖掘其內(nèi)在價值,就需要分布式計算來支撐起海量數(shù)據(jù)的分析工作。

       回過頭來看,早先那種多線程,多任務(wù)分解的日志分析設(shè)計,其實是分布式計算的一個單機版縮略,如何將這種單機的工作分拆,變成集群工作協(xié)同,其實就是分布式計算框架設(shè)計所涉及的。在去年參加BEA的大會時候,BEAVMWare合作采用虛擬機來構(gòu)建集群,無非就是希望使得計算機硬件能夠類似于應(yīng)用程序中的資源池中的資源,使用者無需關(guān)心資源的分配情況,最大化了硬件資源的使用價值。分布式計算也是如此,具體的計算任務(wù)交由哪一臺機器執(zhí)行,執(zhí)行后由誰來匯總,這都由分布式框架的Master來抉擇,而使用者只需簡單的將待分析內(nèi)容的提供給分布式計算系統(tǒng)作為輸入,就可以得到分布式計算后的結(jié)果。    HadoopApache開源組織的一個分布式計算開源框架,在很多大型網(wǎng)站上都已經(jīng)得到了應(yīng)用,亞馬遜,Facebook,Yahoo等等。對于我來說,最近的一個使用點就是服務(wù)集成平臺的日志分析,服務(wù)集成平臺的日志量將會很大,這也正好符合了分布式計算的適用場景(日志分析,索引建立就是兩大應(yīng)用場景)。

       當前沒有正式確定使用,所以也是自己業(yè)余摸索,后續(xù)所寫的相關(guān)內(nèi)容,都是一個新手的學習過程,難免會有一些錯誤,只是希望記錄下來可以分享給更多志同道合的朋友。

 

What is Hadoop

       搞什么東西之前,第一步是要知道What,然后是Why,最后才是How,但很多開發(fā)的朋友在做了多年項目以后,都習慣是先How,然后What,最后才是Why,這樣只會變得浮躁,同時往往會將技術(shù)誤用不適合的場景。

       Hadoop框架中最核心設(shè)計就是:MapReduceHDFS。MapReduce的思想是由Google的一篇論文所提及而被廣為流傳的,簡單的一句話解釋MapReduce就是任務(wù)的分解與結(jié)果的匯總。HDFSHadoop分布式文件系統(tǒng)的縮寫,為分布式計算存儲提供了底層支持。

       MapReduce從它名字上來看就大致可以看出個緣由,兩個動詞Map,Reduce,Map(展開)就是將一個任務(wù)分解成為多個任務(wù),Reduce就是將分解后多任務(wù)處理的結(jié)果匯總起來,得出最后的分析結(jié)果。這不是什么新思想,其實在前面提到了多線程,多任務(wù)的設(shè)計就可以找到這種思想的影子。不論是現(xiàn)實社會,還是在程序設(shè)計中,一項工作往往可以被拆分成為多個任務(wù),任務(wù)之間的關(guān)系可以分為兩種:一種是不相關(guān)的任務(wù),可以并行執(zhí)行;另一種是任務(wù)之間有相互的依賴,先后順序不能夠顛倒,這類任務(wù)是無法并行處理的?;氐竭^去,大學老師上課時讓大家去分析關(guān)鍵路徑,無非就是找最省時的任務(wù)分解執(zhí)行方式。在分布式系統(tǒng)中,機器集群就可以看作硬件資源池,將并行的任務(wù)拆分交由每一個空閑機器資源去處理,能夠極大地提高計算效率,同時這種資源無關(guān)性,對于計算集群的擴展無疑提供了最好的設(shè)計保證。(其實我一直認為Hadoop的卡通圖標不應(yīng)該是一個小象,應(yīng)該是螞蟻,分布式計算就好比螞蟻吃大象,廉價的機器群可以匹敵任何高性能的計算機,縱向擴展的曲線始終敵不過橫向擴展的斜線)。任務(wù)分解處理以后,那就需要將處理以后的結(jié)果在匯總起來,這就是Reduce要做的工作。

 

1 MapReduce

       上圖就是MapReduce大致的結(jié)構(gòu)圖,在Map前還可能會對輸入的數(shù)據(jù)有split的過程,保證任務(wù)并行效率,在Map之后還會有shuffle的過程,對于提高Reduce的效率以及減小數(shù)據(jù)傳輸?shù)膲毫τ泻艽蟮膸椭?。后面會具體提及這些部分的細節(jié)。

 

       HDFS是分布式計算的存儲基石,Hadoop的分布式文件系統(tǒng)和其他分布式文件系統(tǒng)有很多類似的特質(zhì)。

       分布式文件系統(tǒng)基本的幾個特點:

1.       對于整個集群有單一的命名空間。

2.       數(shù)據(jù)一致性。適合一次寫入多次讀取的模型,客戶端在文件沒有被成功創(chuàng)建之前是無法看到文件存在。

3.       文件會被分割成多個文件塊,每個文件塊被分配存儲到數(shù)據(jù)節(jié)點上,而且根據(jù)配置會有復(fù)制文件塊來保證數(shù)據(jù)的安全性。

 

2 HDFS

       上圖中展現(xiàn)了整個HDFS三個重要角色:NameNode,DataNode,Client

NameNode可以看作是分布式文件系統(tǒng)中的管理者,主要負責管理文件系統(tǒng)的命名空間,集群配置信息,存儲塊的復(fù)制。NameNode會存儲文件系統(tǒng)的Meta-data在內(nèi)存中,這些信息主要包括了文件信息,每一個文件對應(yīng)的文件塊的信息,每一個文件塊在DataNode的信息。

DataNode是文件存儲的基本單元。它存儲Block在本地文件系統(tǒng)中,保存了BlockMeta-data,同時周期性的發(fā)送所有存在的block的報告給NameNode。

Client就是需要獲取分布式文件系統(tǒng)文件的應(yīng)用程序。

這里通過三個操作來說明他們之間的交互關(guān)系。

 

文件寫入:

1.       ClientNameNode發(fā)起文件寫入的請求。

2.       NameNode根據(jù)文件大小和文件塊配置情況,返回給Client它所管理部分DataNode的信息。

3.       Client將文件劃分為多個Block,根據(jù)DataNode的地址信息,按順序?qū)懭氲矫恳粋€DataNode塊中。

 

文件讀?。?/font>

1.       ClientNameNode發(fā)起文件讀取的請求。

2.       NameNode返回文件存儲的DataNode的信息。

3.       Client讀取文件信息。

 

文件Block復(fù)制:

1.       NameNode發(fā)現(xiàn)部分文件的block不符合最小復(fù)制數(shù)或者部分DataNode失效。

2.       通知DataNode相互復(fù)制Block。

3.       DataNode開始直接相互復(fù)制。

 

最后在說一下HDFS的幾個設(shè)計特點:(對于框架設(shè)計值得借鑒)

1.  Block的放置

默認不配置,一個Block會有三份備份。一份放在NameNode指定的DataNode,另一份放在與指定DataNode非同一Rack上的DataNode,最后一份放在與指定DataNode同一Rack上的DataNode上。備份無非就是為了數(shù)據(jù)安全,考慮同一Rack的失敗情況以及不同Rack之間數(shù)據(jù)拷貝性能問題就采用這種配置方式。

 

2.  心跳檢測DataNode的健康狀況,如果發(fā)現(xiàn)問題就采取數(shù)據(jù)備份的方式來保證數(shù)據(jù)的安全性。

 

3.  數(shù)據(jù)復(fù)制。(DataNode失敗的時候,需要平衡DataNode的存儲利用率的時候,需要平衡DataNode數(shù)據(jù)交互壓力的時候)

這里先說一下,使用HDFSbalancer命令,可以配置一個Threshold來平衡每一個DataNode磁盤利用率。例如設(shè)置了Threshold10%,那么執(zhí)行balancer命令的時候,首先統(tǒng)計所有DataNode的磁盤利用率的均值,然后判斷如果某一個DataNode的磁盤利用率超過這個均值Threshold以上,那么將會把這個DataNodeblock轉(zhuǎn)移到磁盤利用率低的DataNode,這對于新節(jié)點的加入來說十分有用。

 

4.  數(shù)據(jù)交驗。采用CRC32作數(shù)據(jù)交驗。在文件Block寫入的時候除了寫入數(shù)據(jù)還會寫入交驗信息,在讀取的時候需要交驗后再讀入。

5.  NameNode是單點。如果失敗的話,任務(wù)處理信息將會紀錄在本地文件系統(tǒng)和遠端的文件系統(tǒng)中。

6.  數(shù)據(jù)管道性的寫入。

當客戶端要寫入文件到DataNode上,首先客戶端讀取一個Block然后寫到第一個DataNode上,然后由第一個DataNode傳遞到備份的DataNode上,一直到所有需要寫入這個BlockNataNode都成功寫入,客戶端才會繼續(xù)開始寫下一個Block。

7.  安全模式。

在分布式文件系統(tǒng)啟動的時候,開始的時候會有安全模式,當分布式文件系統(tǒng)處于安全模式的情況下,文件系統(tǒng)中的內(nèi)容不允許修改也不允許刪除,直到安全模式結(jié)束。安全模式主要是為了系統(tǒng)啟動的時候檢查各個DataNode上數(shù)據(jù)塊的有效性,同時根據(jù)策略必要的復(fù)制或者刪除部分數(shù)據(jù)塊。運行期通過命令也可以進入安全模式。在實踐過程中,系統(tǒng)啟動的時候去修改和刪除文件也會有安全模式不允許修改的出錯提示,只需要等待一會兒即可。

 

       綜合MapReduceHDFS來看Hadoop的結(jié)構(gòu):

 

 

3 Hadoop

       Hadoop的系統(tǒng)中,會有一臺Master,主要負責NameNode的工作以及JobTracker的工作。JobTracker是的主要職責就是啟動,跟蹤,調(diào)度各個Slave的任務(wù)執(zhí)行。還會有多臺Slave,每一臺Slave通常具有DataNode的功能以及TaskTracker的工作。TaskTracker根據(jù)應(yīng)用要求來結(jié)合本地數(shù)據(jù)執(zhí)行Map任務(wù)以及Reduce任務(wù)。

       說到這里,就要提到分布式計算的最重要的一個設(shè)計點:Moving Computation is Cheaper than Moving Data。就是在分布式處理中,移動數(shù)據(jù)的代價總是高于轉(zhuǎn)移計算的代價。簡單來說就是分而治之的工作,需要將數(shù)據(jù)也分而存儲,本地任務(wù)處理本地數(shù)據(jù)然后歸總,這樣才會保證分布式計算的高效性。


Why is hadoop

       說完了What,簡單的說一下Why。官方網(wǎng)站已經(jīng)給了很多的說明,這里就大致說一下其優(yōu)點及使用的場景(沒有不好的工具,只用不適用的工具,因此選擇好場景才能夠真正發(fā)揮分布式計算的作用)

1.  可擴展。不論是存儲的可擴展還是計算的可擴展都是Hadoop的設(shè)計根本。

2.  經(jīng)濟??蚣芸梢赃\行在任何普通的PC上。

3.  可靠。分布式文件系統(tǒng)的備份恢復(fù)機制以及MapReduce的任務(wù)監(jiān)控保證了分布式處理的可靠性。

4.  高效。分布式文件系統(tǒng)的高效數(shù)據(jù)交互實現(xiàn)以及MapReduce結(jié)合Local Data處理的模式,為高效處理海量的信息作了基礎(chǔ)準備。

 

使用場景:個人覺得最適合的就是海量數(shù)據(jù)的分析,其實Google最早提出MapReduce也就是為了海量數(shù)據(jù)分析。同時HDFS最早是為了搜索引擎實現(xiàn)而開發(fā)的,后來才被用于分布式計算框架中。

海量數(shù)據(jù)被分割于多個節(jié)點,然后由每一個節(jié)點并行計算,將得出結(jié)果歸并到輸出。同時第一階段的輸出又可以作為下一階段計算的輸入,因此可以想象到一個樹狀結(jié)構(gòu)的分布式計算圖,在不同階段都有不同產(chǎn)出,同時并行和串行結(jié)合的計算也可以很好的在分布式集群的資源下得以高效的處理。

 


How to Use Hadoop & Tips

其實參看Hadoop官方文檔已經(jīng)能夠很容易配置分布式框架運行環(huán)境了,不過這里既然寫了就再多寫一點,同時有一些細節(jié)需要注意的也說一下,其實也就是這些細節(jié)會讓人摸索半天。

Hadoop可以單機跑,也可以配置集群跑,單機跑就不需要多說了,只需要按照Demo的運行說明直接執(zhí)行命令即可。這里主要重點說一下集群配置運行的過程。

 

環(huán)境:

7臺普通的機器,操作系統(tǒng)都是linux。內(nèi)存和CPU就不說了,反正Hadoop一大特點就是機器在多不在精。JDK必須是1.5以上的,這個切記。7臺機器的機器名務(wù)必不同,后續(xù)會談到機器名對于MapReduce有很大的影響。

 

部署考慮:

正如上面我描述的,對于Hadoop的集群來說,可以分成兩大類角色,MasterSlave,前者主要配置NameNodeJobTracker的角色,負責總管分布式數(shù)據(jù)和分解任務(wù)的執(zhí)行,后者配置DataNodeTaskTracker的角色,負責分布式數(shù)據(jù)存儲以及任務(wù)的執(zhí)行。本來打算一臺機器是否可以配置成為Master同時也是Slave,不過發(fā)現(xiàn)在NameNode初始化的過程中以及TaskTracker執(zhí)行過程中機器名配置好像有沖突(NameNodeTaskTracker對于Hosts的配置有些沖突,究竟是把機器名對應(yīng)IP放在配置前面還是把Localhost對應(yīng)IP放在前面有點問題,不過可能也是我自己的問題吧,這個大家可以根據(jù)實施情況給我反饋)。最后反正決定一臺Master,六臺Slave,后續(xù)復(fù)雜的應(yīng)用開發(fā)和測試結(jié)果的比對會增加機器配置。

 

實施步驟:

1.  在所有的機器上都建立相同的目錄,也可以就建立相同的用戶,以該用戶的home路徑來做hadoop的安裝路徑。例如我在所有的機器上都建立了/home/wenchu。

2.  下載Hadoop,先解壓到Master上。這里我是下載的0.17.1的版本。此時Hadoop的安裝路徑就是/home/wenchu/hadoop-0.17.1。

3.  解壓后進入conf目錄,主要需要修改以下文件:hadoop-env.sh,hadoop-site.xml,masters,slaves

Hadoop的基礎(chǔ)配置文件是hadoop-default.xml,看Hadoop的代碼可以知道,默認建立一個Job的時候會建立JobConfig,Config首先讀入hadoop-default.xml的配置,然后再讀入hadoop-site.xml的配置(這個文件初始的時候配置為空),hadoop-site.xml中主要配置你需要覆蓋的hadoop-default.xml的系統(tǒng)級配置,以及你需要在你的MapReduce過程中使用的自定義配置(具體的一些使用例如final等參考文檔)。

 

以下是一個簡單的hadoop-site.xml的配置:

<?xml version="1.0"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>

<property>

   <name>fs.default.name</name>//你的namenode的配置,機器名加端口

   <value>hdfs://10.2.224.46:54310/</value>

</property>

<property>

   <name>mapred.job.tracker</name>//你的JobTracker的配置,機器名加端口

   <value>hdfs://10.2.224.46:54311/</value>

</property>

<property>

   <name>dfs.replication</name>//數(shù)據(jù)需要備份的數(shù)量,默認是三

   <value>1</value>

</property>

<property>

    <name>hadoop.tmp.dir</name>//Hadoop的默認臨時路徑,這個最好配置,然后在新增節(jié)點或者其他情況下莫名其妙的DataNode啟動不了,就刪除此文件中的tmp目錄即可。不過如果刪除了NameNode機器的此目錄,那么就需要重新執(zhí)行NameNode格式化的命令了。

    <value>/home/wenchu/hadoop/tmp/</value>

</property>

<property>

   <name>mapred.child.java.opts</name>//java虛擬機的一些參數(shù)可以參照配置

   <value>-Xmx512m</value>

</property>

<property>

  <name>dfs.block.size</name>//block的大小,單位字節(jié),后面會提到用處,必須是512的倍數(shù),因為采用crc作文件完整性交驗,默認配置512checksum的最小單元。

  <value>5120000</value>

  <description>The default block size for new files.</description>

</property>

</configuration>

 

hadoop-env.sh文件只需要修改一個參數(shù):

# The java implementation to use.  Required.

export JAVA_HOME=/usr/ali/jdk1.5.0_10

配置你的Java路徑,記住一定要1.5版本以上,免得莫名其妙出現(xiàn)問題。

 

Masters中配置Mastersip或者機器名,如果是機器名那么需要在/etc/hosts中有所設(shè)置。

Slaves中配置的是Slavesip或者機器名,同樣如果是機器名需要在/etc/hosts中有所設(shè)置。

范例如下:我這里配置的都是ip.

Masters:

10.2.224.46

 

Slaves:

10.2.226.40

10.2.226.39

10.2.226.38

10.2.226.37

10.2.226.41

10.2.224.36

 

4.  建立Master到每一臺Slavessh受信證書。由于Master將會通過SSH啟動所有的SlaveHadoop,所以需要建立單向或者雙向證書保證命令執(zhí)行時不需要再輸入密碼。Master和所有的Slave機器上執(zhí)行:ssh-keygen -t rsa。執(zhí)行此命令的時候,看到提示只需要回車。然后就會在/root/.ssh/下面產(chǎn)生id_rsa.pub的證書文件,通過scpMaster機器上的這個文件拷貝到Slave上(記得修改名稱),例如:scp root@masterIP:/root/.ssh/id_rsa.pub /root/.ssh/46_rsa.pub,然后執(zhí)行cat /root/.ssh/46_rsa.pub >>/root/.ssh/authorized_keys,建立authorized_keys文件即可,可以打開這個文件看看,也就是rsa的公鑰作為keyuser@IP作為value。此時可以試驗一下,從master sshslave已經(jīng)不需要密碼了。由slave反向建立也是同樣,為什么要反向呢,其實如果一直都是Master啟動和關(guān)閉的話那么沒有必要建立反向,只是如果想在Slave也可以關(guān)閉Hadoop就需要建立反向。

5.  Master上的Hadoop通過scp拷貝到每一個Slave相同的目錄下,根據(jù)每一個SlaveJava_HOME的不同修改其hadoop-env.sh

6.  修改Master/etc/profile

新增以下內(nèi)容:具體的內(nèi)容根據(jù)你的安裝路徑修改,這步只是為了方便使用

export HADOOP_HOME=/home/wenchu/hadoop-0.17.1

export PATH=$PATH:$HADOOP_HOME/bin

 

修改完執(zhí)行 source /etc/profile來使得其生效。

7.  Master上執(zhí)行Hadoop namenode –format,這是第一需要做的初始化,可以看作格式化吧,以后除了在上面我提到過刪除了Master上的hadoop.tmp.dir目錄,否則是不需要再次執(zhí)行的。

8.  然后執(zhí)行Master上的start-all.sh,這個命令可以直接執(zhí)行,因為在6已經(jīng)添加到了path路徑了,這個命令是啟動hdfsmapreduce兩部分,當然你也可以分開單獨啟動hdfsmapreduce,分別是bin目錄下的start-dfs.shstart-mapred.sh。

9.  檢查Masterlogs目錄看看Namenode日志以及JobTracker日志是否正常啟動。

10.              檢查Slavelogs目錄看看Datanode日志以及TaskTracker日志是否正常。

11.              如果需要關(guān)閉,那么就直接執(zhí)行stop-all.sh即可。

 

以上步驟就可以啟動Hadoop的分布式環(huán)境,然后在Master的機器進入Master的安裝目錄,執(zhí)行hadoop jar hadoop-0.17.1-examples.jar wordcount 輸入路徑 輸出路徑,就可以看到字數(shù)統(tǒng)計的效果了。此處的輸入路徑和輸出路徑都指的是HDFS中的路徑,因此你可以首先通過拷貝本地文件系統(tǒng)中的目錄到HDFS中的方式來建立HDFS中的輸入路徑:

hadoop dfs -copyFromLocal /home/wenchu/test-in test-in。其中/home/wenchu/test-in是本地路徑,test-in是將會建立在HDFS中的路徑,執(zhí)行完畢以后可以通過hadoop dfs –ls可以看到test-in目錄已經(jīng)存在,同時可以通過hadoop dfs –ls test-in看來里面的內(nèi)容。輸出路徑要求是在HDFS中不存在的,當執(zhí)行完那個demo以后,就可以通過hadoop dfs –ls 輸出路徑看到其中的內(nèi)容,具體文件的內(nèi)容可以通過hadoop dfs –cat 文件名稱來查看。

 

注意事項:這部分是我在使用過程中花了一些時間走的彎路

1.  MasterSlave上的幾個conf配置文件不需要全部同步,如果確定都是通過Master去啟動和關(guān)閉,那么Slave機器上的配置不需要去維護。但如果希望在任意一臺機器都可以啟動和關(guān)閉Hadoop,那么就需要全部保持一致了。

2.  MasterSlave機器上的/etc/hosts中必須把集群中機器都配置上去,就算在各個配置文件中使用的是ip。這個吃過不少苦頭,原來以為如果配成ip就不需要去配置host,結(jié)果發(fā)現(xiàn)在執(zhí)行Reduce的時候總是卡住,在拷貝的時候就無法繼續(xù)下去,不斷重試。另外如果集群中如果有兩臺機器的機器名如果重復(fù)也會出現(xiàn)問題。

3.  如果在新增了節(jié)點或者刪除節(jié)點的時候出現(xiàn)了問題,首先就去刪除Slavehadoop.tmp.dir,然后重新啟動試試看,如果還是不行那就干脆把Masterhadoop.tmp.dir刪除(意味著dfs上的數(shù)據(jù)也會丟失),如果刪除了Masterhadoop.tmp.dir那么就需要重新namenode –format了。

4.  Map任務(wù)個數(shù)以及Reduce任務(wù)個數(shù)配置。前面分布式文件系統(tǒng)設(shè)計提到一個文件被放入到分布式文件系統(tǒng)中,會被分割成多個block放置到每一個的DataNode上,默認dfs.block.size應(yīng)該是64M,也就是說如果你放置到HDFS上的數(shù)據(jù)小于64,那么將只有一個Block,此時會被放置到某一個DataNode中,這個可以通過使用命令:hadoop dfsadmin –report就可以看到各個節(jié)點存儲的情況。也可以直接去某一個DataNode查看目錄:hadoop.tmp.dir/dfs/data/current就可以看到那些block了。Block的數(shù)量將會直接影響到Map的個數(shù)。當然可以通過配置來設(shè)定MapReduce的任務(wù)個數(shù)。Map的個數(shù)通常默認和HDFS需要處理的blocks相同。也可以通過配置Map的數(shù)量或者配置minimum split size來設(shè)定,實際的個數(shù)為:max(min(block_size,data/#maps),min_split_size)Reduce可以通過這個公式計算:0.95*num_nodes*mapred.tasktracker.tasks.maximum。

 

總的來說出了問題或者啟動的時候最好去看看日志,這樣心里有底。

 

Hadoop Command

       這部分內(nèi)容其實可以通過命令的Help以及介紹了解,我主要側(cè)重于介紹一下我用的比較多的幾個命令。

Hadoop dfs 這個命令后面加參數(shù)就是對于HDFS的操作,和linux操作系統(tǒng)的命令很類似,例如:

Hadoop dfs –ls 就是查看/usr/root目錄下的內(nèi)容,默認如果不填路徑這就是當前用戶路徑

Hadoop dfs –rmr xxx就是刪除目錄,還有很多命令看看就很容易上手

 

Hadoop dfsadmin –report 這個命令可以全局的查看DataNode的情況。

Hadoop job 后面增加參數(shù)是對于當前運行的Job的操作,例如list,kill

 

Hadoop balancer就是前面提到的均衡磁盤負載的命令。

 

其他就不詳細介紹了。

 

Hadoop基本流程以及簡單應(yīng)用的開發(fā)

基本流程:

 

 

 

 

 

 

 

 

一個圖片太大了,只好分割成為兩部分。根據(jù)流程圖來說一下具體的一個任務(wù)執(zhí)行的情況。

 

1.  分布式環(huán)境中客戶端創(chuàng)建任務(wù)并提交。

2.  InputFormatMap前的預(yù)處理,主要負責以下工作:

a)         驗證輸入的格式是否符合JobConfig的輸入定義,這個在實現(xiàn)Map和構(gòu)建Conf的時候就會知道,不定義可以是Writable的任意子類。

b)        input的文件split為邏輯上的輸入InputSplit,其實這就是在上面提到的在分布式文件系統(tǒng)中blocksize是有大小限制的,因此大文件會被劃分為多個block。

c)         通過RecordReader來再次處理inputsplit為一組records,輸出給Map。(inputsplit只是邏輯切分的第一步,但是如何根據(jù)文件中的信息來切分還需要RecordReader來實現(xiàn),例如最簡單的默認方式就是回車換行的切分)

 

3.  RecordReader處理后的結(jié)果作為Map的輸入,Map執(zhí)行定義的Map邏輯,輸出處理后的key,value對到臨時中間文件。

4.  Combiner可選擇配置,主要作用是在每一個Map執(zhí)行完分析以后,在本地優(yōu)先作Reduce的工作,減少在Reduce過程中的數(shù)據(jù)傳輸量。

5.  Partitioner可選擇配置,主要作用是在多個Reduce的情況下,指定Map的結(jié)果由某一個Reduce處理,每一個Reduce都會有單獨的輸出文件。(后面的代碼實例中有介紹使用場景)

6.  Reduce執(zhí)行具體的業(yè)務(wù)邏輯,并且將處理結(jié)果輸出給OutputFormat。

7.  OutputFormat的職責是,驗證輸出目錄是否已經(jīng)存在,同時驗證輸出結(jié)果類型是否如Config中配置,最后輸出Reduce匯總后的結(jié)果。

 

 

 

代碼范例:

 

業(yè)務(wù)場景描述:

           可設(shè)定輸入和輸出路徑(操作系統(tǒng)的路徑非HDFS路徑),根據(jù)訪問日志分析某一個應(yīng)用訪問某一個API的總次數(shù)和總流量,統(tǒng)計后分別輸出到兩個文件中。

 

僅僅為了測試,因此沒有去細分很多類,將所有的類都歸并于一個類便于說明問題。

 

 

 

 

 

4 測試代碼類圖

       LogAnalysiser就是主類,主要負責創(chuàng)建,提交任務(wù),并且輸出部分信息。內(nèi)部的幾個子類用途可以參看流程中提到的角色職責。具體的看看幾個類和方法的代碼片斷:

 

LogAnalysiser::MapClass

         public static class MapClass extends MapReduceBase

       implements Mapper<LongWritable, Text, Text, LongWritable>

         {

                   public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter)

                                     throws IOException

                   {       

                            String line = value.toString();//沒有配置RecordReader,所以默認采用line的實現(xiàn),key就是行號,value就是行內(nèi)容

                            if (line == null || line.equals(""))

                                     return;

                            String[] words = line.split(",");

                            if (words == null || words.length < 8)

                                     return;

                            String appid = words[1];

                            String apiName = words[2];

                            LongWritable recbytes = new LongWritable(Long.parseLong(words[7]));

                            Text record = new Text();

                            record.set(new StringBuffer("flow::").append(appid)

                                                                 .append("::").append(apiName).toString());

                            reporter.progress();

                            output.collect(record, recbytes);//輸出流量的統(tǒng)計結(jié)果,通過flow::作為前綴來標示。

                            record.clear();

                            record.set(new StringBuffer("count::").append(appid).append("::").append(apiName).toString());

                            output.collect(record, new LongWritable(1));//輸出次數(shù)的統(tǒng)計結(jié)果,通過count::作為前綴來標示

                   }       

         }

 

LogAnalysiser:: PartitionerClass

    public static class PartitionerClass implements Partitioner<Text, LongWritable>

      {

           public int getPartition(Text key, LongWritable value, int numPartitions)

           {

                 if (numPartitions >= 2)//Reduce 個數(shù),判斷流量還是次數(shù)的統(tǒng)計分配到不同的Reduce

                      if (key.toString().startsWith("flow::"))

                            return 0;

                      else

                            return 1;

                 else

                      return 0;

           }

           public void configure(JobConf job){}  

}

LogAnalysiser:: CombinerClass

參看ReduceClass,通常兩者可以使用一個,不過這里有些不同的處理就分成了兩個。在ReduceClass中藍色的行表示在CombinerClass中不存在。

 

LogAnalysiser:: ReduceClass

    public static class ReduceClass extends MapReduceBase

           implements Reducer<Text, LongWritable,Text, LongWritable>

      {

           public void reduce(Text key, Iterator<LongWritable> values,

                      OutputCollector<Text, LongWritable> output, Reporter reporter)throws IOException

           {

                 Text newkey = new Text();

                 newkey.set(key.toString().substring(key.toString().indexOf("::")+2));

                 LongWritable result = new LongWritable();

                 long tmp = 0;

                 int counter = 0;

                 while(values.hasNext())//累加同一個key的統(tǒng)計結(jié)果

                 {

                      tmp = tmp + values.next().get();

                     

                      counter = counter +1;//擔心處理太久,JobTracker長時間沒有收到報告會認為TaskTracker已經(jīng)失效,因此定時報告一下

                      if (counter == 1000)

                      {

                            counter = 0;

                            reporter.progress();

                      }

                 }

                 result.set(tmp);

                 output.collect(newkey, result);//輸出最后的匯總結(jié)果

           }    

      }

 

LogAnalysiser

      public static void main(String[] args)

      {

           try

           {

                 run(args);

           } catch (Exception e)

           {

                 e.printStackTrace();

           }

      }

    public static void run(String[] args) throws Exception

      {

           if (args == null || args.length <2)

           {

                 System.out.println("need inputpath and outputpath");

                 return;

           }

           String inputpath = args[0];

           String outputpath = args[1];

           String shortin = args[0];

           String shortout = args[1];

           if (shortin.indexOf(File.separator) >= 0)

                 shortin = shortin.substring(shortin.lastIndexOf(File.separator));

           if (shortout.indexOf(File.separator) >= 0)

                 shortout = shortout.substring(shortout.lastIndexOf(File.separator));

           SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd");

           shortout = new StringBuffer(shortout).append("-")

                 .append(formater.format(new Date())).toString();

          

          

           if (!shortin.startsWith("/"))

                 shortin = "/" + shortin;

           if (!shortout.startsWith("/"))

                 shortout = "/" + shortout;

           shortin = "/user/root" + shortin;

           shortout = "/user/root" + shortout;              

           File inputdir = new File(inputpath);

           File outputdir = new File(outputpath);

           if (!inputdir.exists() || !inputdir.isDirectory())

           {

                 System.out.println("inputpath not exist or isn‘t dir!");

                 return;

           }

           if (!outputdir.exists())

           {

                 new File(outputpath).mkdirs();

           }

          

           JobConf conf = new JobConf(new Configuration(),LogAnalysiser.class);//構(gòu)建Config

           FileSystem fileSys = FileSystem.get(conf);

           fileSys.copyFromLocalFile(new Path(inputpath), new Path(shortin));//將本地文件系統(tǒng)的文件拷貝到HDFS

 

           conf.setJobName("analysisjob");

           conf.setOutputKeyClass(Text.class);//輸出的key類型,在OutputFormat會檢查

           conf.setOutputValueClass(LongWritable.class); //輸出的value類型,在OutputFormat會檢查

           conf.setMapperClass(MapClass.class);

           conf.setCombinerClass(CombinerClass.class);

           conf.setReducerClass(ReduceClass.class);

           conf.setPartitionerClass(PartitionerClass.class);

           conf.set("mapred.reduce.tasks", "2");//強制需要有兩個Reduce來分別處理流量和次數(shù)的統(tǒng)計

           FileInputFormat.setInputPaths(conf, shortin);//hdfs中的輸入路徑

           FileOutputFormat.setOutputPath(conf, new Path(shortout));//hdfs中輸出路徑

          

           Date startTime = new Date();

            System.out.println("Job started: " + startTime);

            JobClient.runJob(conf);

            Date end_time = new Date();

            System.out.println("Job ended: " + end_time);

            System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");

            //刪除輸入和輸出的臨時文件

           fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));

           fileSys.delete(new Path(shortin),true);

           fileSys.delete(new Path(shortout),true);

      }

以上的代碼就完成了所有的邏輯性代碼,然后還需要一個注冊驅(qū)動類來注冊業(yè)務(wù)Class為一個可標示的命令,讓hadoop jar可以執(zhí)行。

public class ExampleDriver {

  public static void main(String argv[]){

    ProgramDriver pgd = new ProgramDriver();

    try {

      pgd.addClass("analysislog", LogAnalysiser.class, "A map/reduce program that analysis log .");

      pgd.driver(argv);

    }

    catch(Throwable e){

      e.printStackTrace();

    }

  }

}

將代碼打成jar,并且設(shè)置jarmainClassExampleDriver這個類。

 

在分布式環(huán)境啟動以后執(zhí)行如下語句:

hadoop jar analysiser.jar analysislog /home/wenchu/test-in /home/wenchu/test-out

 

/home/wenchu/test-in中是需要分析的日志文件,執(zhí)行后就會看見整個執(zhí)行過程,包括了Map,Reduce的進度。執(zhí)行完畢會在/home/wenchu/test-out下看到輸出的內(nèi)容。有兩個文件:part-00000part-00001分別記錄了統(tǒng)計后的結(jié)果。

       如果需要看執(zhí)行的具體情況,可以看在輸出目錄下的_logs/history/xxxx_analysisjob,里面羅列了所有的MapReduce的創(chuàng)建情況以及執(zhí)行情況。

 

在運行期也可以通過瀏覽器來查看Map,Reduce的情況:

http://MasterIP:50030/jobtracker.jsp

 

 


Hadoop集群測試

       首先這里使用上面的范例作為測試,也沒有做太多的優(yōu)化配置,這個測試結(jié)果只是為了看看集群的效果,以及一些參數(shù)配置的影響。

 

文件復(fù)制數(shù)為1,blocksize 5M

Slave數(shù)

處理記錄數(shù)(萬條)

執(zhí)行時間(秒)

2

95

38

2

950

337

4

95

24

4

950

178

6

95

21

6

950

114

 

Blocksize 5M

Slave數(shù)

處理記錄數(shù)(萬條)

執(zhí)行時間(秒)

2(文件復(fù)制數(shù)為1

950

337

2(文件復(fù)制數(shù)為3

950

339

6(文件復(fù)制數(shù)為1

950

114

6(文件復(fù)制數(shù)為3

950

117

 

文件復(fù)制數(shù)為1

 

Slave數(shù)

處理記錄數(shù)(萬條)

執(zhí)行時間(秒)

6(blocksize 5M)

95

21

6(blocksize 77M)

95

26

4(blocksize 5M)

950

178

4(blocksize 50M)

950

54

6(blocksize 5M)

950

114

6(blocksize 50M)

950

44

6(blocksize 77M)

950

74

 

測試的數(shù)據(jù)結(jié)果很穩(wěn)定,基本測幾次同樣條件下都是一樣。

測試結(jié)果可以看出一下幾點:

1.       機器數(shù)對于性能還是有幫助的(等于沒說^_^)。

2.       文件復(fù)制數(shù)的增加只對安全性有幫助,但是對于性能沒有太多幫助。而且現(xiàn)在采取的是將操作系統(tǒng)文件拷貝到HDFS中,所以備份多了,準備的時間很長。

3.       blocksize對于性能影響很大,首先如果將block劃分的太小,那么將會增加job的數(shù)量,同時也增加了協(xié)作的代價,降低了性能,但是配置的太大也會讓job不能最大化并行處理。所以這個值的配置需要根據(jù)數(shù)據(jù)處理的量來考慮。

4.       最后就是除了這個表里面列出來的結(jié)果,應(yīng)該去仔細看輸出目錄中的_logs/history中的xxx_analysisjob這個文件,里面記錄了全部的執(zhí)行過程以及讀寫情況。這個可以更加清楚地了解哪里可能會更加耗時。

 

 

隨想

       “云計算”熱的燙手,就和SAAS,Web2,SNS等等一樣,往往都是在搞概念,只有真正踏踏實實的那些大型的互聯(lián)網(wǎng)公司,才會投入人力物力去研究符合自己的分布式計算。其實當你的數(shù)據(jù)量沒有那么大的時候,這種分布式計算也就僅僅只是一個玩具而已,真正只有解決問題的過程中,它深層次的問題才會被挖掘出來。

       這篇文章僅僅是為了給對分布式計算有興趣的朋友拋個磚,要想真的掘到金子,那么就踏踏實實的去用,去想,去分析。后續(xù)自己也會更進一步的去研究框架中的實現(xiàn)機制,在解決自己問題的同時,也能夠貢獻一些什么。

       前幾日看到有人跪求成為架構(gòu)師的方式,看了有些可悲,有些可笑,其實有多少架構(gòu)師知道什么叫做架構(gòu),架構(gòu)師的職責是什么,與其追求這么一個名號,還不如踏踏實實的作塊石頭沉到水底,積累和沉淀的過程就是一種成長。

本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服