大數(shù)據我們也許聽得很多了,一般都知道hadoop,但并不都是hadoop。那么我們該如何構建自己的大數(shù)據項目呢?對于離線處理,hadoop還是比較適合的,但對于實時性比較強的,數(shù)據量比較大的,可以采用spark,那spark又跟什么技術搭配才能做一個適合自己的項目呢?各技術之間又是如何整合的呢?這篇文章將給大家介紹下大數(shù)據項目中用到的各技術框架知識,并通過一個實際項目的分布式集群部署和實際業(yè)務應用來詳細講述大數(shù)據架構是如何構建的,供大家參考。
1大數(shù)據架構介紹
1.1 數(shù)據采集
負責從各節(jié)點上實時采集數(shù)據,選用flume來實現(xiàn)
Flume介紹
Flume是Cloudera(Hadoop數(shù)據管理軟件與服務提供商)提供的一個分布式、可靠、和高可用的海量日志采集、聚合和傳輸?shù)娜罩臼占到y(tǒng),支持在日志系統(tǒng)中定制各類數(shù)據發(fā)送方,用于收集數(shù)據;同時,F(xiàn)lume提供對數(shù)據進行簡單處理,并寫到各種數(shù)據接受方(可定制)的能力。
Flume的一些核心概念
Flume的數(shù)據流模型
Flume以agent為最小的獨立運行單位。一個agent就是一個JVM(JavaVirtual Machine)。單agent由Source、Sink和Channel三大組件構成,如下圖:
Flume的數(shù)據流由事件(Event)貫穿始終。事件是Flume的基本數(shù)據單位,它攜帶日志數(shù)據(字節(jié)數(shù)組形式)并且攜帶有頭信息,這些Event由Agent外部的Client,比如圖中的WebServer生成。當Source捕獲事件后會進行特定的格式化,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區(qū),它將保存事件直到Sink處理完該事件。Sink負責持久化日志或者把事件推向另一個Source。
Flume Sources
我們系統(tǒng)中使用的是Syslog Sources
Flume Sinks
我們系統(tǒng)中使用的是HDFS Sink和Kafka Sink
1.2、數(shù)據接入
由于采集數(shù)據的速度和數(shù)據處理的速度不一定同步,因此添加一個消息中間件來作為緩沖,選用apache的kafka,對于離線數(shù)據,選用hdfs
HDFS介紹
HDFS(Hadoop Distribute File System Hadoop分布式文件系統(tǒng))是一個運行在通用硬件上的分布式文件系統(tǒng),是ApacheHadoop Core項目的一部分。HDFS有著高容錯性(fault-tolerant)的特點,并且設計用來部署在低廉的(low-cost)硬件上。而且它提供高吞吐量(high throughput)來訪問應用程序的數(shù)據,適合那些有著超大數(shù)據集(large data set)的應用程序。HDFS放寬了(relax)POSIX的要求(requirements)這樣可以實現(xiàn)流的形式訪問(streaming access)文件系統(tǒng)中的數(shù)據。
HDFS的一些核心概念
HDFS的高可用性
在一個典型的HA(High Availability)集群中,每個NameNode是一臺獨立的服務器。在任一時刻,只有一個NameNode處于active狀態(tài),另一個處于standby狀態(tài)。其中,active狀態(tài)的NameNode負責所有的客戶端操作,standby狀態(tài)的NameNode處于從屬地位,維護著數(shù)據狀態(tài),隨時準備切換。
兩個NameNode為了數(shù)據同步,會通過一組稱作JournalNodes的獨立進程進行相互通信。當active狀態(tài)的NameNode的命名空間有任何修改時,會告知大部分的JournalNodes進程。standby狀態(tài)的NameNode有能力讀取JNs中的變更信息,并且一直監(jiān)控edit log的變化,把變化應用于自己的命名空間。standby可以確保在集群出錯時,命名空間狀態(tài)已經完全同步了,如圖所示
HDFS的體系結構
HDFS是一個主/從(Master/Slave)體系結構,從最終用戶的角度來看,它就像傳統(tǒng)的文件系統(tǒng)一樣,可以通過目錄路徑對文件執(zhí)行CRUD(Create、Read、Update和Delete)操作。但由于分布式存儲的性質,HDFS集群擁有一個NameNode和一些DataNode。NameNode管理文件系統(tǒng)的元數(shù)據,DataNode存儲實際的數(shù)據??蛻舳送ㄟ^同NameNode和DataNodes的交互訪問文件系統(tǒng)??蛻舳寺?lián)系NameNode以獲取文件的元數(shù)據,而真正的文件I/O操作是直接和DataNode進行交互的。如圖所示。
Kafka介紹
Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費者規(guī)模的網站中的所有動作流數(shù)據。這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現(xiàn)代網絡上的許多社會功能的一個關鍵因素。 這些數(shù)據通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。 對于像Hadoop的一樣的日志數(shù)據和離線分析系統(tǒng),但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的并行加載機制來統(tǒng)一線上和離線的消息處理,也是為了通過集群機來提供實時的消費。
Kafka的一些核心概念
Kafka拓撲結構
一個典型的Kafka集群中包含若干Producer(可以是web前端產生的PageView,或者是服務器日志,系統(tǒng)CPU、Memory等),若干broker(Kafka支持水平擴展,一般broker數(shù)量越多,集群吞吐率越高),若干Consumer Group,以及一個Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發(fā)生變化時進行rebalance。Producer使用push模式將消息發(fā)布到broker,Consumer使用pull模式從broker訂閱并消費消息。如圖所示
Zookeeper介紹
ZooKeeper是一個為分布式應用所設計的分布的、開源的協(xié)調服務。它提供了一些簡單的操作,使得分布式應用可以基于這些接口實現(xiàn)諸如配置維護、域名服務、分布式同步、組服務等。
Zookeeper很容易編程接入,它使用了一個和文件樹結構相似的數(shù)據模型??梢允褂肑ava或者C來進行編程接入。
Zookeeper的一些基本概念
Zookeeper的系統(tǒng)模型
Zookeeper的設計目的
(1)最終一致性
client不論連接到哪個Server,展示給它都是同一個視圖,這是zookeeper最重要的性能。
(2)可靠性
具有簡單、健壯、良好的性能,如果消息m被一臺服務器接受,那么它將被所有的服務器接受。
(3)實時性
Zookeeper保證客戶端將在一個時間間隔范圍內獲得服務器的更新信息,或者服務器失效的信息。但由于網絡延時等原因,Zookeeper不能保證兩個客戶端能同時得到剛更新的數(shù)據,如果需要最新數(shù)據,應該在讀數(shù)據之前調用sync()接口。
(4)等待無關(wait-free)
慢的或者失效的client不得干預快速的client的請求,使得每個client都能有效的等待
(5)原子性
更新只能成功或者失敗,沒有中間狀態(tài)。
(6)順序性
包括全局有序和偏序兩種:全局有序是指如果在一臺服務器上消息a在消息b前發(fā)布,則在所有Server上消息a都將在消息b前被發(fā)布;偏序是指如果一個消息b在消息a后被同一個發(fā)送者發(fā)布,a必將排在b前面。
1.3、流式計算
對采集到的數(shù)據進行實時分析,選用apache的Spark
Spark介紹
Spark是UCBerkeley AMP lab (加州大學伯克利分校的AMP實驗室)所開源的類Hadoop MapReduce的通用并行框架,而后又加入到Apache孵化器項目。Spark,擁有HadoopMapReduce所具有的優(yōu)點;但不同于MapReduce的是Job中間輸出結果可以保存在內存中,從而不再需要讀寫磁盤,因此Spark能更好地適用于數(shù)據挖掘與機器學習等需要迭代的MapReduce的算法。
Spark不僅實現(xiàn)了MapReduce的算子map 函數(shù)和reduce函數(shù)及計算模型,還提供更為豐富的算子,如filter、join、groupByKey等。是一個用來實現(xiàn)快速而通用的集群計算的平臺。
Spark支持多處理模式以及支持庫,Spark Streaming、Spark SQL、Spark MLlib、Spark GraphX等極大程度方便開發(fā)者在大數(shù)據處理方面的不同需求
Spark的生態(tài)體系
MapReduce屬于Hadoop生態(tài)體系之一,Spark則屬于BDAS(BerkeleyData Analytics Stack,中文:伯克利數(shù)據分析棧)生態(tài)體系之一
Hadoop包含了MapReduce、HDFS、HBase、Hive、Zookeeper、Pig、Sqoop等
BDAS包含了Spark、Shark(相當于Hive)、BlinkDB、Spark Streaming(消息實時處理框架,類似Storm)等等
Spark的運行模式
l Local (用于測試、開發(fā))
l Standlone (獨立集群模式)
l Spark on Yarn (Spark在Yarn上)
l Spark on Mesos (Spark在Mesos)
我們系統(tǒng)中使用的是Spark on Yarn
1.4、數(shù)據輸出
對分析后的結果持久化,可以使用HDFS、Hbase、MongoDB
Hbase介紹
HBase是ApacheHadoop的數(shù)據庫,能夠對大型數(shù)據提供隨機、實時的讀寫訪問。HBase的目標是存儲并處理大型的數(shù)據。HBase是一個開源的,分布式的,多版本的,面向列的存儲模型。它存儲的是松散型數(shù)據。
HBase是一個構建在HDFS上的分布式列存儲系統(tǒng);
HBase是基于GoogleBigTable模型開發(fā)的,典型的key/value系統(tǒng);
HBase是ApacheHadoop生態(tài)系統(tǒng)中的重要一員,主要用于海量結構化數(shù)據存儲;
從邏輯上講,HBase將數(shù)據按照表、行和列進行存儲。
與hadoop一樣,Hbase目標主要依靠橫向擴展,通過不斷增加廉價的商用服務器,來增加計算和存儲能力。
Hbase的系統(tǒng)架構
關于HBase與ZooKeeper,可以分三點來描述:
A、Zookeeper集群的職責
A.1、負責監(jiān)控整個hbase集群中節(jié)點的狀態(tài)和通信。
A.2、管理hbase 集群的-ROOT-表,即所有Region Server的地址和HTable信息。
A.3、避免HMsater的單點故障問題(重啟故障的HMaster;如果zkLeader掛掉,重新選舉出leader)。
B、HMaster Server的職責
B.1、為HRegionserver分配HRegion,并持續(xù)均衡負載;
B.2、當有HRegionserver失效時,由HMaster負責重新分配其上的HRegion。
C、HRegion Server的職責
C.1、維護HMaster分配的HRegin,響應客戶端的請求(增刪改查)。
C.2、管理.META.表數(shù)據,該表中包含當前HRegion Server上HRegion的相關信息。
C.3、負責region的切分,并將相關region切分信息更新到.META.表中。
MongoDB介紹
MongoDB是一個基于分布式文件存儲的數(shù)據庫。由C++語言編寫。旨在為WEB應用提供可擴展的高性能數(shù)據存儲解決方案。
MongoDB是一個介于關系數(shù)據庫和非關系數(shù)據庫之間的產品,是非關系數(shù)據庫當中功能最豐富,最像關系數(shù)據庫的。他支持的數(shù)據結構非常松散,是類似json的bson格式,因此可以存儲比較復雜的數(shù)據類型。MongoDB最大的特點是他支持的查詢語言非常強大,其語法有點類似于面向對象的查詢語言,幾乎可以實現(xiàn)類似關系數(shù)據庫單表查詢的絕大部分功能,而且還支持對數(shù)據建立索引。
MongoDB的一些基本概念
MongoDB的數(shù)據模型
一個MongoDB 實例可以包含一組數(shù)據庫,一個DataBase 可以包含一組Collection(集合),一個集合可以包含一組Document(文檔)。一個Document包含一組field(字段),每一個字段都是一個key/value pair。
key: 必須為字符串類型。
value:可以包含如下類型:
1、基本類型,例如,string,int,float,timestamp,binary 等類型。
2、一個document。
3、數(shù)組類型。
文檔結構如下:
2 HA模式下的分布式集群環(huán)境搭建
2.1、機器規(guī)劃
2.2、Hadoop+Hbase+Zookeeper集群部署
(1)修改各機器上的/etc/hosts文件,刪除文件中本機ip與 機器名對應的一行
作用:有時候/etc/hosts文件里對應的主機名包含一些特殊字符或其他原因導致不能正確解析主機名,最好是去掉這一行
(2)ssh免密碼登錄
作用:Hadoop中主節(jié)點管理從節(jié)點是通過SSH協(xié)議登錄到從節(jié)點實現(xiàn)的,而一般的SSH登錄,都是需要輸入密碼驗證的,為了Hadoop主節(jié)點方便管理成千上百的從節(jié)點,這里將主節(jié)點公鑰拷貝到從節(jié)點,實現(xiàn)SSH協(xié)議免秘鑰登錄,我這里做的是所有主從節(jié)點之間機器免秘鑰登錄
在三臺機器上分別執(zhí)行:
ssh-keygen -t rsa
chmod 755 用戶目錄
chmod 700 用戶目錄的~/.ssh
在機器上執(zhí)行了ssh-keygen -t rsa命令后會在用戶目錄的~/.ssh文件夾中生成這兩個文件:id_rsa和id_rsa.pub,其中id_rsa中存的算法rsa生成的私鑰,id_rsa.pub存放算法rsa生成的公鑰。
下一步,將所有機器上的id_rsa.pub中的內容匯總到某一臺機器上,并寫入文件:用戶目錄的~/.ssh/authorized_keys中。
注意:每臺機器的id_rsa.pub中的內容在文件authorized_keys中是一行
下一步:將用戶目錄的~/.ssh/authorized_keys文件復制到其他兩臺機器上
在每臺機器上執(zhí)行:
chmod 600 用戶目錄的~/.ssh/authorized_keys
(3)Zookeeper部署
A、下載zookeeper的安裝包, 解壓到合適目錄
B、修改配置文件:zoo.cfg
集群中部署了幾臺zookeeper機器,文件中就添加幾行,并且集群中的zookeeper機器上的zoo.cfg文件必須一致
(4)Hadoop部署
A、下載hadoop的安裝包, 解壓到合適目錄
B、配置hadoop
修改hadoopconf目錄下的文件,特別是core-site.xml,hadoop-env.sh,hdfs-site.xml,mapred-site.xml,yarn-site.xml,注意根據實際部署機器,修改ip和端口
復制hadoopconf目錄到集群中其他兩臺機器
(5)Hbase部署
A、下載hbase的安裝包, 解壓到合適目錄
B、配置Hbase
修改hbaseconf目錄下的文件,特別是backup-masters,hbase-env.sh,hbase-site.xml,regionservers,注意根據實際部署機器,修改ip和端口
復制hbaseconf目錄到集群中其他兩臺機器
(6)啟動集群
A、首次啟動集群
B、非首次啟動集群
(7)關閉集群
注意:一定要按順序停止,如果先停 ZooKeeper 再停 HBase 的話,基本停不下來
(8)HDFS實用工具
HDFS狀態(tài)查詢:bin/hdfsdfsadmin -report
離開安全模式:bin/hdfs dfsadmin -safemodeleave
文件目錄查詢:bin/hadoop fs -ls
文件內容查看:bin/hadoop fs -cat
文件上傳:bin/hadoop fs -put
文件下載:bin/hadoop fs -get
文件(夾)刪除:bin/hadoop fs -rm (-rmr)
(9)Hbase實用工具
連接hbase客戶端:bin/hbaseshell
shell命令:
表清單:list
表創(chuàng)建:
create 'UserProfile', {NAME => 'realtime',VERSIONS => 1, TTL => 2147483647},{NAME => 'profile', VERSIONS =>1, TTL => 2147483647}
表查詢:
scan 'UserProfile', LIMIT => 10
表數(shù)據構造:(put '表名','openid','列族:列','值')
put 'UserProfile','1222234','realtime:sex','1'
put 'UserProfile','4555554','realtime:age','25'
表刪除,分兩步:
disable 'UserProfile'
drop 'UserProfile'
查看表結構:
describe 'UserProfile'
(10)Kafka部署
A、下載kafka的安裝包,解壓到合適目錄
B、配置Kafka
修改kafkaconf目錄下的文件,特別是server.properties、zookeeper.properties、producer.properties、consumer.properties
,注意根據實際部署機器,修改ip和端口
C、啟動Kafka
kafka/bin/kafka-server-start.sh../config/server.properties
啟動成功后可通過jps命令查看進程
D、停止Kafka
kafka/bin/kafka-server-stop.sh
E、Kafka實用命令
數(shù)據查詢:bin/kafka-console-consumer.sh
數(shù)據構造:bin/kafka-console-producer.sh
Topic查詢修改:bin/kafka-topics.sh
(11)Flume部署
A、下載flume的安裝包,解壓到合適目錄
B、配置Flume
修改flume/conf/目錄下的文件,特別是core-site-lf.xml、hdfs-site-lf.xml、flume-wq.conf,注意根據實際部署機器,修改ip和端口
C、啟動Flume
flume/sbin/flume-ng agent --conf conf --conf-fileconf/flume-wq.conf --name a1
啟動成功后可通過jps命令查看進程
(12)Mongodb部署
A、下載mongodb的安裝包,解壓到合適目錄
B、配置Mongodb
修改/export/shadows/mongodb/conf目錄下的文件:configsvr.conf,mongos.conf,shard1.conf
C、啟動Mongodb
啟動配置服務:
mongodb/bin/mongod -f conf/configsvr.conf
啟動路由服務:
mongos -f conf/mongos.conf
啟動物理存儲:
mongod -f conf/shard1.conf
啟動成功后查看進程:ps -ef |grep mongo
D、Mongodb實用工具
連接客戶端:mongo --port 30000
客戶端命令:
數(shù)據庫清單:show databases;
數(shù)據庫切換:use tablename1;
集合(表)清單:show collections;
查詢表所有記錄:db.tablename1.find(); //相當于select * from tablename1;
條件查詢:db.tablename1.find({'groupId':'8037'}); //相當于select * from tablename1where groupId = 8037
3、實際業(yè)務應用
3.1、業(yè)務曝光
(1)配置中心修改如下手工服務為flume機器的ip和端口
mcoss_ds_mmart_show-------全量數(shù)據曝光
mcoss_ds_simpshow-----------簡要數(shù)據曝光
mcoss_ds_keystat--------------關鍵數(shù)據曝光
(2)訪問曝光tws服務
http://wq.jd.com/mcoss/mmart/show?actid=xxxx&pc=xxxx
(3)數(shù)據驗證
A hdfs數(shù)據驗證
hadoop fs -ls -h /data/flume 類似于ls命令
hadoop fs -cat /data/flume/xxx/xxx/xxx/xxx 類似于cat查看文件內容
下載文件到本地:
hadoop fs-get /data/flume/xxx/xxx/xxx/xxx /tmp/xxx
B Kafka數(shù)據驗證
查看有哪些topic
kafka/bin/kafka-topics.sh --list --zookeeper zookeeper機器ip:端口,zookeeper機器ip:端口,zookeeper機器ip:端口
數(shù)據查詢:
./kafka-console-consumer.sh --zookeeper zookeeper機器ip:端口,zookeeper機器ip:端口,zookeeper機器ip:端口 --from-beginning --topic mmart_show|more