免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開(kāi)APP
userphoto
未登錄

開(kāi)通VIP,暢享免費(fèi)電子書(shū)等14項(xiàng)超值服

開(kāi)通VIP
基于Hadoop生態(tài)SparkStreaming的大數(shù)據(jù)實(shí)時(shí)流處理平臺(tái)的搭建

隨著公司業(yè)務(wù)發(fā)展,對(duì)大數(shù)據(jù)的獲取和實(shí)時(shí)處理的要求就會(huì)越來(lái)越高,日志處理、用戶行為分析、場(chǎng)景業(yè)務(wù)分析等等,傳統(tǒng)的寫(xiě)日志方式根本滿足不了業(yè)務(wù)的實(shí)時(shí)處理需求,所以本人準(zhǔn)備開(kāi)始著手改造原系統(tǒng)中的數(shù)據(jù)處理方式,重新搭建一個(gè)實(shí)時(shí)流處理平臺(tái),主要是基于Hadoop生態(tài),利用Kafka作為中轉(zhuǎn),SparkStreaming框架實(shí)時(shí)獲取數(shù)據(jù)并清洗,將結(jié)果多維度的存儲(chǔ)進(jìn)HBase數(shù)據(jù)庫(kù)。


整個(gè)平臺(tái)大致的框架如下:



操作系統(tǒng):Centos7


用到的框架:

 1. Flume1.8.0
 2. Hadoop2.9.0
 3. kafka2.11-1.0.0
 4. Spark2.2.1
 5. HBase1.2.6
 6. ZooKeeper3.4.11
 7. maven3.5.2


整體的開(kāi)發(fā)環(huán)境是基于JDK1.8以上以及Scala,所以得提前把java和Scala的環(huán)境給準(zhǔn)備好,接下來(lái)就開(kāi)始著手搭建基礎(chǔ)平臺(tái):


一、配置開(kāi)發(fā)環(huán)境


下載并解壓JDK1.8,、下載并解壓Scala,配置profile文件:


vim /etc/profile


export JAVA_HOME=/usr/java/jdk1.8.0_144
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export SCALA_HOME=/usr/local/scala-2.11.12
export PATH=$PATH:$SCALA_HOME/bin


source /etc/profile


二、配置zookeeper、maven環(huán)境


下載并解壓zookeeper以及maven并配置profile文件


wget http://mirrors.hust.edu.cn/apache/maven/maven-3/3.5.2/binaries/apache-maven-3.5.2-bin.tar.gz
tar -zxvf apache-maven-3.5.2-bin.tar.gz -C /usr/local
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz
tar -zxvf zookeeper-3.4.11.tar.gz -C /usr/local
vim /etc/profile


export MAVEN_HOME=/usr/local/apache-maven-3.5.2
export PATH=$PATH:$MAVEN_HOME/bin


source /etc/profile


zookeeper的配置文件配置一下:


cp /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.11/conf/zoo.cfg


然后配置一下zoo.cfg里面的相關(guān)配置,指定一下dataDir目錄等等


啟動(dòng)zookeeper:


/usr/local/zookeeper-3.4.11/bin/zkServer.sh start


如果不報(bào)錯(cuò),jps看一下是否啟動(dòng)成功


三、安裝配置Hadoop


Hadoop的安裝配置在之前文章中有說(shuō)過(guò)(傳送門(mén)),為了下面的步驟方便理解,這里只做一個(gè)單機(jī)版的簡(jiǎn)單配置說(shuō)明:


下載hadoop解壓并配置環(huán)境:


wget http://mirrors.hust.edu.cn/apache/hadoop/common/hadoop-2.9.0/hadoop-2.9.0.tar.gz
tar -zxvf hadoop-2.9.0.tar.gz -C /usr/local
vim /etc/profile


export HADOOP_HOME=/usr/local/hadoop-2.9.0
export PATH=$PATH:$HADOOP_HOME/bin


source /etc/profile


配置hadoop 進(jìn)入/usr/local/hadoop-2.9.0/etc/hadoop目錄


cd /usr/local/hadoop-2.9.0/etc/hadoop


首先配置hadoop-env.sh、yarn-env.sh,修改JAVA_HOME到指定的JDK安裝目錄/usr/local/java/jdk1.8.0_144


創(chuàng)建hadoop的工作目錄


mkdir /opt/data/hadoop


編輯core-site.xml、hdfs-site.xml、yarn-site.xml等相關(guān)配置文件,具體配置不再闡述請(qǐng)看前面的文章,配置完成之后記得執(zhí)行hadoop namenode -format,否則hdfs啟動(dòng)會(huì)報(bào)錯(cuò),啟動(dòng)完成后不出問(wèn)題瀏覽器訪問(wèn)50070端口會(huì)看到hadoop的頁(yè)面。


四、安裝配置kafka


還是一樣,先下載kafka,然后配置:


wget http://mirrors.hust.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -zxvf kafka_2.11-1.0.0.tgz -C /usr/local
vim /etc/profile


export KAFKA_HOME=/usr/local/kafka_2.11-1.0.0
export PATH=$KAFKA_HOME/bin:$PATH


source /etc/profile


進(jìn)入kafka的config目錄,配置server.properties,指定log.dirs和zookeeper.connect參數(shù);配置zookeeper.properties文件中zookeeper的dataDir,配置完成后啟動(dòng)kafka


kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties


可以用jps查看有沒(méi)有kafka進(jìn)程,然后測(cè)試一下kafka是否能夠正常收發(fā)消息,開(kāi)兩個(gè)終端,一個(gè)用來(lái)做producer發(fā)消息一個(gè)用來(lái)做consumer收消息,首先,先創(chuàng)建一個(gè)topic


kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic testTopic
kafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic


如果不出一下會(huì)看到如下輸出:


Topic:testTopic    PartitionCount:1    ReplicationFactor:1 Configs:
Topic: testTopic    Partition: 0    Leader: 0   Replicas: 0 Isr: 0


然后在第一個(gè)終端中輸入命令:


kafka-console-producer.sh –broker-list localhost:9092 –topic testTopic


在第二個(gè)終端中輸入命令:


kafka-console-consumer.sh –zookeeper 127.0.0.1:2181 –topic testTopic


如果啟動(dòng)都正常,那么這兩個(gè)終端將進(jìn)入阻塞監(jiān)聽(tīng)狀態(tài),在第一個(gè)終端中輸入任何消息第二個(gè)終端都將會(huì)接收到。


五、安裝配置HBase


下載并解壓HBase:


wget http://mirrors.hust.edu.cn/apache/hbase/1.2.6/hbase-1.2.6-bin.tar.gz
tar -zxvf hbase-1.2.6-bin.tar.gz -C /usr/local/
vim /etc/profile


export HBASE_HOME=/usr/local/hbase-1.2.6
export PATH=$PATH:$HBASE_HOME/bin


source /etc/profile


修改hbase下的配置文件,首先修改hbase-env.sh,主要修改JAVA_HOME以及相關(guān)參數(shù),這里要說(shuō)明一下HBASE_MANAGES_ZK這個(gè)參數(shù),因?yàn)椴捎昧俗约旱膠ookeeper,所以這里設(shè)置為false,否則hbase會(huì)自己?jiǎn)?dòng)一個(gè)zookeeper


cd /usr/local/hbase-1.2.6/conf
vim hbase-env.sh


export JAVA_HOME=/usr/local/java/jdk1.8.0_144/
HBASE_CLASSPATH=/usr/local/hbase-1.2.6/conf
export HBASE_MASTER_OPTS='$HBASE_MASTER_OPTS -XX:PermSize=256m -XX:MaxPermSize=1024m'
export HBASE_REGIONSERVER_OPTS='$HBASE_REGIONSERVER_OPTS -XX:PermSize=256m -XX:MaxPermSize=1024m'
export HBASE_PID_DIR=/opt/data/hbase
export HBASE_MANAGES_ZK=false


然后修改hbase-site.xml,我們?cè)O(shè)置hbase的文件放在hdfs中,所以要設(shè)置hdfs地址,其中tsk1是我安裝hadoop的機(jī)器的hostname,hbase.zookeeper.quorum參數(shù)是安裝zookeeper的地址,這里的各種地址最好用機(jī)器名


vim hbase-site.xml


configuration>
   property>
       name>hbase.rootdirname>
       value>hdfs://tsk1:9000/hbasevalue>
   property>
   property>
       name>hbase.mastername>
       value>tsk1:60000value>
   property>
   property>
       name>hbase.master.portname>
       value>60000value>
   property>
   property>
       name>hbase.cluster.distributedname>
       value>truevalue>
   property>
   property>
       name>hbase.zookeeper.quorumname>
       value>192.168.70.135value>
   property>
   property>
       name>zookeeper.znode.parentname>
       value>/hbasevalue>
   property>
   property>
       name>hbase.zookeeper.property.dataDirname>
       value>/opt/data/zookeepervalue>
   property>
   property>
       name>hbase.master.info.bindAddressname>
       value>tsk1value>
   property>
configuration>


配置完成后啟動(dòng)hbase,輸入命令:


start-hbase.sh


完成后查看日志沒(méi)有報(bào)錯(cuò)的話測(cè)試一下hbase,用hbase shell進(jìn)行測(cè)試:


hbase shell
hbase(main):001:0>create 'myTestTable','info'
0 row(s) in 2.2460 seconds
=> Hbase::Table - myTestTable
hbase(main):003:0>list
TABLE                                                                                                                    
testTable                                                                                                                
1 row(s) in 0.1530 seconds

=> ['myTestTable']


至此,hbase搭建成功,訪問(wèn)以下hadoop的頁(yè)面,查看file system(菜單欄Utilities->Browse the file system),這時(shí)可以看見(jiàn)base的相關(guān)文件已經(jīng)載hadoop的文件系統(tǒng)中。


六、安裝spark


下載spark并解壓


wget http://mirrors.hust.edu.cn/apache/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz
tar -zxvf spark-2.2.1-bin-hadoop2.7.tgz -C /usr/local
vim /etc/profile


export SPARK_HOME=/usr/local/spark-2.2.1-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin


source /etc/profile


七、測(cè)試


至此,環(huán)境基本搭建完成,以上搭建的環(huán)境僅是服務(wù)器生產(chǎn)環(huán)境的一部分,涉及服務(wù)器信息、具體調(diào)優(yōu)信息以及集群的搭建就不寫(xiě)在這里了,下面我們寫(xiě)一段代碼整體測(cè)試一下從kafka生產(chǎn)消息到spark streaming接收到,然后處理消息并寫(xiě)入HBase。先寫(xiě)一個(gè)HBase的連接類HBaseHelper:


public class HBaseHelper {
   private static HBaseHelper ME;
   private static Configuration config;
   private static Connection conn;
   private static HBaseAdmin admin;
   public static HBaseHelper getInstances() {
       if (null == ME) {
           ME = new HBaseHelper();
           config = HBaseConfiguration.create();
           config.set('hbase.rootdir', 'hdfs://tsk1:9000/hbase');
           config.set('hbase.zookeeper.quorum', 'tsk1');
           config.set('hbase.zookeeper.property.clientPort', '2181');
           config.set('hbase.defaults.for.version.skip', 'true');
       }
       if (null == conn) {
           try {
               conn = ConnectionFactory.createConnection(config);
               admin = new HBaseAdmin(config);
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
       return ME;
   }
   public Table getTable(String tableName) {
       Table table = null;
       try {
           table = conn.getTable(TableName.valueOf(tableName));
       } catch (Exception ex) {
           ex.printStackTrace();
       }
       return table;
   }
   public void putAdd(String tableName, String rowKey, String cf, String column, Long value) {
       Table table = this.getTable(tableName);
       try {
           table.incrementColumnValue(rowKey.getBytes(), cf.getBytes(), column.getBytes(), value);
           System.out.println('OK!');
       } catch (IOException e) {
           e.printStackTrace();
       }
   }
//......以下省略
}


再寫(xiě)一個(gè)測(cè)試類KafkaRecHbase用來(lái)做spark-submit提交


package com.test.spark.spark_test;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class KafkaRecHbase {
   private static final Pattern SPACE = Pattern.compile(' ');
   public static void main(String[] args) throws Exception {
       Logger.getLogger('org').setLevel(Level.ERROR);
       SparkConf sparkConf = new SparkConf();
       sparkConf.setAppName('kafkaRecHbase');
       sparkConf.setMaster('local[2]');
       JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
       int numThreads = Integer.parseInt(args[3]);
       MapString, Integer> topicMap = new HashMap<>();
       String[] topics = args[2].split(',');
       for (String topic : topics) {
           topicMap.put(topic, numThreads);
       }
       JavaPairReceiverInputDStreamString, String> kafkaStream =
               KafkaUtils.createStream(ssc, args[0], args[1], topicMap);
       JavaDStreamString> lines = kafkaStream.map(Tuple2::_2);
       JavaDStreamString> lineStr = lines.map(line -> {
           if (null == line || line.equals('')) {
               return '';
           }
           String[] strs = SPACE.split(line);
           if (strs.length <>1) {
               return '';
           }
           try {
               for (String str : strs) {
                   HBaseHelper.getInstances().putAdd('myTestTable', str, 'info', 'wordCunts', 1l);
               }
               return 'strs:' + line;
           } catch (Exception ex) {
               System.out.println(line);
               return '報(bào)錯(cuò)了:' + ex.getMessage();
           }
       });
       lineStr.print();
       ssc.start();
       System.out.println('spark 啟動(dòng)?。?!');
       ssc.awaitTermination();
   }
}


編譯提交到服務(wù)器,執(zhí)行命令:


spark-submit --jars $(echo /usr/local/hbase-1.2.6/lib/*.jar | tr ' ' ',') --class com.test.spark.spark_test.KafkaRecHbase --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1 /opt/FileTemp/streaming/spark-test-0.1.1.jar tsk1:2181 test testTopic 1


沒(méi)報(bào)錯(cuò)的話執(zhí)行kafka的producer,輸入幾行數(shù)據(jù)在HBase內(nèi)就能看到結(jié)果了!


八、裝一個(gè)Flume實(shí)時(shí)采集Nginx日志寫(xiě)入Kafka


Flume是一個(gè)用來(lái)日志采集的框架,安裝和配置都比較簡(jiǎn)單,可以支持多個(gè)數(shù)據(jù)源和輸出,具體可以參考Flume的文檔,寫(xiě)的比較全傳送門(mén)


下載Flume并配置環(huán)境


wget http://mirrors.hust.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /usr/local
vim /etc/profile


export FLUME_HOME=/usr/local/apache-flume-1.8.0-bin/
export PATH=$FLUME_HOME/bin:$PATH


source /etc/profile


寫(xiě)一個(gè)Flume的配置文件在flume的conf目錄下:


vim nginxStreamingKafka.conf


agent1.sources=r1
agent1.channels=logger-channel
agent1.sinks=kafka-sink
agent1.sources.r1.type=exec
agent1.sources.r1.deserializer.outputCharset= UTF-8
agent1.sources.r1.command=tail -F /opt/data/nginxLog/nginxLog.log
agent1.channels.logger-channel.type=memory
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = flumeKafka
agent1.sinks.kafka-sink.brokerList = tsk1:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20
agent1.sources.r1.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel


kafka創(chuàng)建一個(gè)名為flumeKafka的topic用來(lái)接收,然后啟動(dòng)flume:


flume-ng agent --name agent1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/nginxStreamingKafka.conf -Dflume.root.logger=INFO,console


如果沒(méi)有報(bào)錯(cuò),F(xiàn)lume將開(kāi)始采集opt/data/nginxLog/nginxLog.log中產(chǎn)生的日志并實(shí)時(shí)推送給kafka,再按照上面方法寫(xiě)一個(gè)spark streaming的處理類進(jìn)行相應(yīng)的處理就好。


OK!全部搞定,然而~~~~就這樣就搞定了?NO?。?!這只是萬(wàn)里長(zhǎng)征的第一步!呵呵!


出處:http://www.tianshangkun.com/2018/01/26/基于Hadoop生態(tài)SparkStreaming的大數(shù)據(jù)實(shí)時(shí)流處理平臺(tái)的搭建/


本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)。
打開(kāi)APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
當(dāng)我說(shuō)要做大數(shù)據(jù)工程師時(shí)他們都笑我,直到三個(gè)月后……
Hadoop等大數(shù)據(jù)學(xué)習(xí)相關(guān)電子書(shū)[共85本]
大數(shù)據(jù)架構(gòu)系統(tǒng)部署應(yīng)用介紹
大數(shù)據(jù)需要用到的知識(shí)
字節(jié)推薦算法終于開(kāi)源!吹爆!|數(shù)據(jù)倉(cāng)庫(kù)|原理|算法|編程
2018大數(shù)據(jù)培訓(xùn)學(xué)習(xí)路線圖(詳細(xì)完整版)
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長(zhǎng)圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服