實(shí)時(shí)流處理簡(jiǎn)單概述:實(shí)時(shí)是說(shuō)整個(gè)流處理相應(yīng)時(shí)間較短,流式技算是說(shuō)數(shù)據(jù)是源源不斷的,沒有盡頭的。實(shí)時(shí)流處理一般是將業(yè)務(wù)系統(tǒng)產(chǎn)生的數(shù)據(jù)進(jìn)行實(shí)時(shí)收集,交由流處理框架進(jìn)行數(shù)據(jù)清洗,統(tǒng)計(jì),入庫(kù),并可以通過(guò)可視化的方式對(duì)統(tǒng)計(jì)結(jié)果進(jìn)行實(shí)時(shí)的展示。本文涉及到的框架或技術(shù)有 Flume,Logstash,kafka,Storm, SparkStreaming等。
實(shí)時(shí)流處理的的流程與技術(shù)選型 :
一、日志收集
由于業(yè)務(wù)系統(tǒng)一般是游離與流處理集群如SparkStreaming、Storm之外的,所以我們需要對(duì)業(yè)務(wù)系統(tǒng)的數(shù)據(jù)進(jìn)行實(shí)時(shí)收集。這就用到了日志收集框架,日志收集框架主要需要解決三個(gè)問(wèn)題:數(shù)據(jù)從哪兒來(lái),數(shù)據(jù)到哪兒去,實(shí)時(shí)收集。因?yàn)樵诹魈幚碇袨榱朔乐雇话l(fā)或激增流量壓垮流處理集群,通常將收集過(guò)后的數(shù)據(jù)輸出到kafka分布式消息系統(tǒng),然后流處理集群去消費(fèi)kafka中的數(shù)據(jù),下面介紹兩種常用的日志收集框架以及他們?nèi)绾螌?duì)接kafka.
1).Apache Flume
這是一個(gè)apache的頂級(jí)項(xiàng)目,所以他的域名為flume.apache.org, 下面是官網(wǎng)上的原理圖,F(xiàn)lume框架把每個(gè)收集任務(wù)都定義為一個(gè)Agent(這是一個(gè)JAVA進(jìn)程),他有三個(gè)基本組件Source、Channel、Sink。
source:收集數(shù)據(jù),可以對(duì)接各種常用數(shù)據(jù)源,如文件(exec source),kafka(kafka source),jms(java消息系統(tǒng))等。
channel:source組件把數(shù)據(jù)收集來(lái)以后,臨時(shí)存放在channel(管道)中,即channel組件在agent中是專門用來(lái)存放臨時(shí)數(shù)據(jù)的,并起到數(shù)據(jù)緩沖的作用。常用的channel有memory chanel 、jdbc chanel 、file channel 等等。
sink:sink組件是用于從channel中取數(shù)據(jù)并送到目的地的組件,目的地包括hdfs、logger、avro、thrift、file、hbase等。
其實(shí)flume的使用就是編寫配置文件,下面是使用flume將Nginx的日志對(duì)接kafka的配置文件,我們將該收集任務(wù)命名為
exec-memory-kafka,只需如下編寫:
#配置source、sink、channel
exec-memory-kafka.sources = exec-source #指定source (數(shù)據(jù)從哪兒來(lái)),可以指定多個(gè)數(shù)據(jù)源,用逗號(hào)分隔。
exec-memory-kafka.sinks = kafka-sink #指定sink(數(shù)據(jù)到哪兒去)
exec-memory-kafka.channels = memory-channel #指定channel
#source詳細(xì)配置
exec-memory-kafka.sources.exec-source.type = exec 執(zhí)行操作系統(tǒng)命令
exec-memory-kafka.sources.exec-source.command = sudo tail -F /var/log/nginx/access.log #監(jiān)控Nginx日志文件
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c #shell命令的前綴
#channel 詳細(xì)配置
exec-memory-kafka.channels.memory-channel.type = memory #內(nèi)存channel
#sink詳細(xì)配置
exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink #類型 為kafka sink
exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092 #kafaka服務(wù)的地址,多個(gè)用逗號(hào)分隔
exec-memory-kafka.sinks.kafka-sink.topic = test1 #指定主題
exec-memory-kafka.sinks.kafka-sink.batchSize = 5 #指定每多少條收集一次,這里是每5條發(fā)送一次。
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1 #使kafka對(duì)是否收到數(shù)據(jù)進(jìn)行確認(rèn),確保數(shù)據(jù)不會(huì)丟失
#為sink和source指定channel
exec-memory-kafka.sources.exec-source.channels = memory-channel
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel
編寫好配置文件后切換到flume的bin目錄下執(zhí)行:
flume-ng agent --conf 配置文件的目錄--conf-file 配置文件的全路徑--name exec-memory-kafka -Dflume.root.logger=INFO,console
即可開啟收集任務(wù)(進(jìn)程的方式)
2).ELK技術(shù)棧的Logstash
Logstash 是一個(gè)開源的數(shù)據(jù)收集引擎,它具有備實(shí)時(shí)數(shù)據(jù)傳輸能力。它可以統(tǒng)一過(guò)濾來(lái)自不同源的數(shù)據(jù),并按照開發(fā)者的制定的規(guī)范輸出到目的地。Logstash使用時(shí)也是編寫配置文件,下面是如何使用配置文件的方式將Nginx日志輸出到Kafka。
#定義數(shù)據(jù)源
input{
#這里是Nginx日志文件
file{
path =>"/var/log/nginx/access.log"
}
}
#數(shù)據(jù)發(fā)到哪,這里是kafka
output{
kafka{
topic_id => "test1" #指定topic
codec=>plain{
format=>"%{message}" #輸出的格式,這里表示只輸出消息,不輸出其他信息,如版本信息等。
}
bootstrap_servers=>"hadoop000:9092" #kafka服務(wù)的地址
batch_size=>1 #每幾條數(shù)據(jù)發(fā)送一次
}
}
切換到logstash的bin目錄,執(zhí)行以下命令即可開始收集任務(wù):
logstash -f 你的配置文件的位置。
二、kafka
kafka是一個(gè)分布式的流處理平臺(tái),在流處理中,我們通常使用他作為一個(gè)消息系統(tǒng)來(lái)使用,他是一個(gè)分布式、支持分區(qū)的(partition)、多副本的(replica),基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng)。
kafka作為消息系統(tǒng)時(shí)相比其他消息中間件主要有4大優(yōu)勢(shì):
- 可擴(kuò)展性:可以通過(guò)增加broker的方式水平擴(kuò)展kafka集群
- 持久性、可靠性:消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失
- 容錯(cuò)性:最大限度的容災(zāi),允許集群中節(jié)點(diǎn)失敗,包括主節(jié)點(diǎn),是高可用的。
- 高并發(fā)
幾個(gè)重要的角色:
Broker:Kafka節(jié)點(diǎn),一個(gè)Kafka節(jié)點(diǎn)就是一個(gè)broker,多個(gè)broker可以組成一個(gè)Kafka集群,一臺(tái)機(jī)器可以啟動(dòng)多個(gè)broker在不同的端口上。
Topic:消息系統(tǒng)中的主題,生產(chǎn)者和消費(fèi)者共同關(guān)注的部分。
Partition:topic物理上的分組,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition是一個(gè)有序的隊(duì)列
Segment:partition物理上由多個(gè)segment組成,每個(gè)Segment存著message信息,以文件的形式
Producer :生產(chǎn)者, 生產(chǎn)message發(fā)送到topic
Consumer : 消費(fèi)者,訂閱topic消費(fèi)message, consumer作為一個(gè)線程來(lái)消費(fèi)
具體的使用可參照官網(wǎng),有詳細(xì)的介紹:
http://kafka.apache.org/quickstart
三、流處理框架
日志信息輸出到kafka后,需要使用流處理框架作為消費(fèi)者去消費(fèi)kafka中的數(shù)據(jù),下面是Storm和Spark的基本原理及其如何使用。
1 .Storm
apache的頂級(jí)項(xiàng)目,官網(wǎng)是storm.apache.org ,他是一個(gè)免費(fèi)的,開源的,分布式的實(shí)時(shí)計(jì)算系統(tǒng)。
Storm有很多用處:如實(shí)時(shí)計(jì)算分析,在線機(jī)器學(xué)習(xí),分布式RPC即DRPC,作為ETL工具等,
Storm特點(diǎn):處理速度快、可擴(kuò)展 、容災(zāi)與高可用的,能夠?qū)崿F(xiàn)高頻數(shù)據(jù)和大規(guī)模數(shù)據(jù)的實(shí)時(shí)處理。
Storm中幾個(gè)核心的概念:
Topologies:拓?fù)?,將整個(gè)流處理流程串起來(lái),每個(gè)storm應(yīng)用程序都需要定義Toplogies,由spout和bolt組成的。
Streams:消息流,抽象概念,由沒有邊界的Tuple構(gòu)成
Spouts:消息流的源頭,Topology的消息生產(chǎn)者。產(chǎn)生數(shù)據(jù)的組件,比如我們要對(duì)接kafka,我們就要定義一個(gè)kafka Spout
Bolts:消息處理單元,可以做過(guò)濾、聚合、查詢/寫數(shù)據(jù)庫(kù)等操作。
Tuple:具體的數(shù)據(jù),傳遞的基本單元。
Storm架構(gòu):
類似于Hadoop的架構(gòu),也是主從架構(gòu)(Master/Slave),所有節(jié)點(diǎn)都是無(wú)狀態(tài)的,在他們上面的信息(元數(shù)據(jù))會(huì)存儲(chǔ)在zookeeper中
Nimbus: 集群的主節(jié)點(diǎn),負(fù)責(zé)任務(wù)(task)的指派和分發(fā)、資源的分配
Supervisor: 從節(jié)點(diǎn),可以啟動(dòng)多個(gè)Worker,可以通過(guò)配置來(lái)指定一個(gè)Topo運(yùn)行在多個(gè)Worker之上,也可以通過(guò)配置來(lái)指定集群的從節(jié)點(diǎn)(負(fù)責(zé)干活的),Supervisor節(jié)點(diǎn)負(fù)責(zé)執(zhí)行任務(wù)的具體部分,啟動(dòng)和停止自己管理的Worker進(jìn)程等,一個(gè)Supervisor默認(rèn)啟動(dòng)4個(gè)Worker進(jìn)程
Worker: 運(yùn)行具體組件邏輯(Spout/Bolt)的進(jìn)程,這是一個(gè)進(jìn)程,一個(gè)Work進(jìn)程只為一個(gè)Topology服務(wù)。
Task: Worker中每一個(gè)Spout和Bolt的線程稱為一個(gè)Task,他是最終運(yùn)行spout或者bolt代碼的最小執(zhí)行單元
executor:是一個(gè)被worker進(jìn)程啟動(dòng)的單獨(dú)線程,Spout和bolt和共享一個(gè)executor,而且一個(gè)executor可以運(yùn)行多個(gè)Task。
下面是各個(gè)組件職責(zé)的示意圖:
編碼時(shí)幾個(gè)核心的角色:
1). ISpout:核心接口(interface),負(fù)責(zé)將數(shù)據(jù)發(fā)送到topology中去處理,Storm會(huì)跟蹤Spout發(fā)出去的tuple的,通過(guò)ack/fail機(jī)制,對(duì)Spout發(fā)送成功或失敗時(shí)做處理,沒條數(shù)據(jù)即Tuple都有自己的message id,而且ack/fail/nextTuple是在同一個(gè)線程中執(zhí)行的,所以不用考慮線程安全方面。
核心方法
open: 初始化操作
close: 資源釋放操作
nextTuple: 發(fā)送數(shù)據(jù)
ack: tuple處理成功,storm會(huì)反饋給spout一個(gè)成功消息
fail:tuple處理失敗,storm會(huì)發(fā)送一個(gè)消息給spout,處理失敗
實(shí)現(xiàn)類:
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
public interface IRichSpout extends ISpout, IComponent {}
我們定義Spout時(shí)只需要繼承BaseRichSpout這個(gè)類,并實(shí)現(xiàn)其中的方法即可。
2).IComponent接口
概述:public interface IComponent extends Serializable
他為topology中所有可能的組件提供公用的方法
如 void declareOutputFields(OutputFieldsDeclarer declarer);
此方法用于聲明當(dāng)前Spout/Bolt發(fā)送的tuple的名稱,使用OutputFieldsDeclarer配合使用
實(shí)現(xiàn)類:
public abstract class BaseComponent implements IComponent
IBolt接口:
概述職責(zé):接收tuple處理,并進(jìn)行相應(yīng)的處理(filter/join/....),IBolt會(huì)在一個(gè)運(yùn)行的機(jī)器上創(chuàng)建,使用Java序列化它,然后提交到主節(jié)點(diǎn)(nimbus)上去執(zhí)行,nimbus會(huì)啟動(dòng)worker來(lái)反序列化,調(diào)用prepare方法,然后才開始處理tuple處理
方法:
prepare:初始化
execute:處理一個(gè)tuple數(shù)據(jù),tuple對(duì)象中包含了元數(shù)據(jù)信息
cleanup:shutdown之前的資源清理操作
實(shí)現(xiàn)類:
public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
public interface IRichBolt extends IBolt, IComponent
RichShellBolt
我們定義Bolt時(shí)只需繼承BaseRichBolt并實(shí)現(xiàn)其中的方法即可。
以下是Storm對(duì)kafka的消息進(jìn)行實(shí)時(shí)打印的代碼實(shí)現(xiàn)。Storm官網(wǎng)有許多對(duì)接主流框架的介紹,引入所需jar包,就可以使用寫好的KafkaSpout,而無(wú)需自己定義KafkaSpout類了。
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-kafka</artifactId>
- <version>${storm.version}</version>
- </dependency>
- public class StormKafkaTopology {
- public static class LogBolt extends BaseRichBolt {
- private OutputCollector outputCollector;
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
- this.outputCollector = outputCollector;
- }
- public void execute(Tuple tuple) {
- try {
- byte[] bytes = tuple.getBinaryByField("bytes");
- String value = new String(bytes);
- System.out.println("value :" + value);
- this.outputCollector.ack(tuple);
- } catch (Exception e) {
- this.outputCollector.fail(tuple);
- }
- }
- //無(wú)后續(xù)bolt,無(wú)需聲明
- public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
- }
- }
- public static void main(String[] args) {
- TopologyBuilder builder = new TopologyBuilder();
- //kafka的配置
- String topicName = "project_topic";
- BrokerHosts hosts = new ZkHosts("hadoop000:2181");
- SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
- //從上次收集的位置開始,而不是從頭開始
- spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
- //創(chuàng)建kafkaSpout
- KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
- builder.setSpout("KafkaSpout", kafkaSpout);
- //設(shè)置Bolt
- builder.setBolt("LogBolt", new LogBolt()).shuffleGrouping("KafkaSpout");
- //本地運(yùn)行Storm任務(wù)
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("StormKafkaTopology", new Config(), builder.createTopology());
- }
- }}
2.SparkStreaming
官網(wǎng)上的介紹如下:
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map
, reduce
, join
and window
. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
即:Spark Streaming 是Spark核心API的一個(gè)擴(kuò)展,可以實(shí)現(xiàn)高吞吐量的、具備容錯(cuò)機(jī)制的實(shí)時(shí)流數(shù)據(jù)的處理。支持從多種數(shù)據(jù)源獲取數(shù)據(jù),包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,從數(shù)據(jù)源獲取數(shù)據(jù)之后,可以使用諸如map、reduce、join和window等高級(jí)函數(shù)進(jìn)行復(fù)雜算法的處理。最后還可以將處理結(jié)果存儲(chǔ)到文件系統(tǒng),數(shù)據(jù)庫(kù)和現(xiàn)場(chǎng)儀表盤。在“One Stack rule them all”的基礎(chǔ)上,還可以使用Spark的其他子框架,如集群學(xué)習(xí)、圖計(jì)算等,對(duì)流數(shù)據(jù)進(jìn)行處理。
Spark嚴(yán)格意義上來(lái)說(shuō)并不能算實(shí)時(shí)流處理,他粗粒度的工作原理為:將實(shí)時(shí)接收的數(shù)據(jù),根據(jù)一定的時(shí)間間隔拆成一批批的數(shù)據(jù),具體來(lái)說(shuō)是一批批RDD(分布式彈性數(shù)據(jù)集,Spark中的核心概念),然后通過(guò)SparkEngine來(lái)處理這些數(shù)據(jù),可能是一些transformation和action操作,最后得到一批批的處理結(jié)果。
Strom和SparkStreaming的對(duì)比:
1).Strom是真正意義上的的流處理,時(shí)延相比SparkStreaming較低,而SparkStremming是將接受的實(shí)時(shí)流數(shù)據(jù),按照指定的時(shí)間間隔拆成一個(gè)個(gè)RDD,在每個(gè)RDD中以批處理的形式處理數(shù)據(jù)。本質(zhì)上還是批處理。
2).Storm會(huì)通過(guò)messageId的方式全局追蹤和記錄每一條記錄,并通過(guò)ack/fail機(jī)制確保每條數(shù)據(jù)至少被處理一次(也可能是多次),而SparkStream應(yīng)用程序只需要批處理級(jí)別對(duì)記錄進(jìn)行追蹤,他能保證每個(gè)批處理記錄僅僅被處理一次。
3).由于SparkStreming是運(yùn)行在Spark平臺(tái)上的無(wú)需單獨(dú)安裝,可以和批處理SparkSql,機(jī)器學(xué)習(xí)等其他其框架結(jié)合起來(lái)使用。
下面使用scala語(yǔ)言將SparkStreming對(duì)接kafka并對(duì)圖書點(diǎn)擊量進(jìn)行實(shí)時(shí)統(tǒng)計(jì)的應(yīng)用代碼:將kafka中收集到的日志進(jìn)行清洗,并轉(zhuǎn)換成ClikcLog對(duì)象,并實(shí)時(shí)統(tǒng)計(jì)的結(jié)果轉(zhuǎn)化成BookClick對(duì)象并寫入Hbase,Nginx日志結(jié)構(gòu)如下:
192.168.126.1 - - [2017-12-02 19:20:28] "GET /books/1 HTTP/1.1" 200 2403 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36" "-"
- object BookCount {
- def main(args: Array[String]): Unit = {
- //以參數(shù)的形式運(yùn)行SparkStreming應(yīng)用程序 四個(gè)參數(shù)為zk地址 ,用戶組, 主題,線程數(shù)
- if (args.length != 4) {
- System.err.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
- }
- val Array(zkQuorum, group, topics, numThreads) = args
- val sparkConf = new SparkConf()
- //構(gòu)造StreamingContext
- val ssc = new StreamingContext(sparkConf, Seconds(5))
- val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
- // Spark Streaming對(duì)接Kafka
- val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
- val logs = messages.map(_._2)
- // 192.168.126.1 - - [2017-12-02 19:20:28] "GET /books/1 HTTP/1.1" 200 2403 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36" "-"
- // 0 1 2 3 4 5 6 7 8
- val cleanData = logs.map(line => {
- val infos = line.split(" ")
- val url = infos(6)
- var bookId = 0
- val time = infos(3).substring(1) + " " + infos(4).substring(0, 7)
- if (url.startsWith("/books/")) {//只關(guān)注以books/開頭的請(qǐng)求
- bookId = url.split("/")(2).toInt
- }
- ClickLog(infos(0), TimeUtil.newTime(time), bookId, infos(8).toInt)
- }).filter(clickLog => clickLog.bookId != 0)//為零表示不滿足要求,忽略。
- //cleanData.print()
- cleanData.map(x => {
- (x.time.substring(0, 8) + "_" + x.bookId, 1)
- }).reduceByKey(_ + _).foreachRDD(rdd => {
- rdd.foreachPartition(record => {
- val list= new ListBuffer[BookClick]
- record.foreach(pair => {
- list.append(BookClick(pair._1,pair._2))
- })
- BookClickDao.put(list)
- })
- })
- ssc.start()
- ssc.awaitTermination()
- }
- }
case class ClickLog(ip:String,time:String,bookId:Int,statusCode:Int)
case class BookClick(day_id:String,click_count:Int)
- object BookClickDao {
- val tableName = "book_clickcount"
- val cf = "info"
- val colume = "click_count"
- def put(list: ListBuffer[BookClick]): Unit = {
- val table = HbaseUtils.getInstance().getTable(tableName)
- for (ele <- list) {
- table.incrementColumnValue(Bytes.toBytes(ele.day_id), Bytes.toBytes(cf), Bytes.toBytes(colume), ele.click_count)
- }
- }
- def get(day_id: String): Long = {
- val table = HbaseUtils.getInstance().getTable(tableName)
- val get = new Get(Bytes.toBytes(day_id))
- val value = table.get(get).getValue(cf.getBytes, colume.getBytes)
- if (value == null)
- 0l
- else
- Bytes.toLong(value)
- }
- }
- object TimeUtil {
- val YYYYMMDDHHMMSS_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
- val TARGET_TIME = FastDateFormat.getInstance("yyyyMMddHHmmss")
- def passTime(time: String)={
- YYYYMMDDHHMMSS_FORMAT.parse(time)
- }
- def newTime(time:String)={
- TARGET_TIME.format(passTime(time))
- }
- def main(args: Array[String]): Unit = {
- println(newTime("2017-12-02 19:13:25"))
- }
- }
因?yàn)榱魈幚砜蚣鼙旧聿痪邆浯鎯?chǔ)能力,最后需要將統(tǒng)計(jì)結(jié)果入庫(kù),并可通過(guò)百度的Echart或者阿里的DataV等數(shù)據(jù)可視化工具,定義sql和時(shí)間間隔,對(duì)統(tǒng)計(jì)結(jié)果進(jìn)行實(shí)時(shí)的展示。
聯(lián)系客服