免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
如何使用Redis流和Apache Spark處理實(shí)時(shí)數(shù)據(jù)?
AI 前線導(dǎo)讀:將 Redis 流作為流數(shù)據(jù)庫,Apache Spark 作為數(shù)據(jù)處理引擎,兩者怎樣共同部署才能做到最佳搭配?

本文要點(diǎn)
  • Apache Spark 的流框架(Structured Streaming)為數(shù)據(jù)流帶來了 SQL 查詢功能,讓用戶可以實(shí)時(shí)、可擴(kuò)展地處理數(shù)據(jù)。

  • Redis 流(Redis Stream)是 Redis 5.0 新引入的數(shù)據(jù)結(jié)構(gòu),能夠以亞毫秒級的延遲高速收集、保存和分發(fā)數(shù)據(jù)。

  • 用戶集成 Redis 流和流框架后就能簡化連續(xù)應(yīng)用程序(continuous application)的擴(kuò)展工作。

  • 開源的 Spark-Redis 庫將 Apache Spark 與 Redis 連接起來。該庫為 Redis 數(shù)據(jù)結(jié)構(gòu)提供 RDD 和數(shù)據(jù)幀 API,使用戶可以將 Redis 流用作流框架的數(shù)據(jù)源。

流框架是 Apache Spark 2.0 新引入的一項(xiàng)功能,在業(yè)界和數(shù)據(jù)工程社區(qū)中引起了很大關(guān)注。流框架 API 構(gòu)建于 Spark SQL 引擎之上,為流數(shù)據(jù)提供類似 SQL 的界面。

早期的 Apache Spark 以微批處理方式處理流框架查詢,延遲大約為 100 毫秒。

去年的 2.3 版本引入了低延遲(1 毫秒)的“連續(xù)處理”,進(jìn)一步推動(dòng)了流框架的應(yīng)用。

為了讓 Spark 保持高速的連續(xù)處理狀態(tài),你需要使用像 Redis 這樣的高速流數(shù)據(jù)庫來支持它。

Redis 開源內(nèi)存數(shù)據(jù)庫以其高速度和亞毫秒級延遲聞名于世。最近 Redis 5.0 新推出了一種名為 Redis 流的數(shù)據(jù)結(jié)構(gòu),使 Redis 能夠在多個(gè)生產(chǎn)者和消費(fèi)者之間消費(fèi)、保存和分發(fā)流數(shù)據(jù)。

現(xiàn)在的問題是,將 Redis 流作為流數(shù)據(jù)庫,Apache Spark 作為數(shù)據(jù)處理引擎,兩者共同部署,怎樣才能做到最佳搭配?

用 Scala 編寫的 Spark-Redis 庫就集成了 Apache Spark 和 Redis,使用它可以: 

  • 在 Redis 中以 RDD 的形式讀寫數(shù)據(jù)

  • 在 Redis 中以數(shù)據(jù)幀的形式讀寫數(shù)據(jù)(例如,它允許將 Spark SQL 表映射到 Redis 數(shù)據(jù)結(jié)構(gòu))

  • 使用 Redis 流作為流框架的數(shù)據(jù)源

  • 在流框架之后將 Redis 實(shí)現(xiàn)為接收器

本文中我將介紹一個(gè)真實(shí)場景,并指導(dǎo)你如何使用 Redis 和 Apache Spark 實(shí)時(shí)處理流數(shù)據(jù)。

模擬場景:計(jì)算實(shí)時(shí)點(diǎn)擊

假設(shè)我們是一家廣告公司,在熱門網(wǎng)站上投放廣告。我們根據(jù)社交媒體上的熱門圖片制作包含流行話題梗的動(dòng)圖,并將其作為廣告投放出去。為了最大化利潤,我們必須識別出能獲得病毒式傳播或贏得更多點(diǎn)擊次數(shù)的資產(chǎn),這樣就能加大它們的投放力度了。

我們的大部分資產(chǎn)傳播期很短,所以能實(shí)時(shí)處理點(diǎn)擊的話,我們就能快速生成傳播趨勢圖,這對業(yè)務(wù)至關(guān)重要。我們理想中的流數(shù)據(jù)解決方案必須記錄所有廣告點(diǎn)擊并實(shí)時(shí)處理,然后計(jì)算每項(xiàng)資產(chǎn)的實(shí)時(shí)點(diǎn)擊次數(shù)。以下是設(shè)計(jì)思路:

輸入

對于每次點(diǎn)擊,我們的數(shù)據(jù)提取方案(圖 1 中的方框 1)將資產(chǎn) ID 和廣告費(fèi)用放在 Redis 流中: 

XADD clicks * asset [asset id] cost [actual cost]

例如: 

XADD clicks * asset aksh1hf98qw7tt9q7 cost 29
輸出

在圖 1 中的方框 2 部分處理數(shù)據(jù)之后,我們的結(jié)果會(huì)存儲(chǔ)在數(shù)據(jù)存儲(chǔ)區(qū)中。數(shù)據(jù)查詢方案(圖 1 中的方框 3)為數(shù)據(jù)提供了一個(gè) SQL 接口,我們可以用它查詢最近幾分鐘的最高點(diǎn)擊次數(shù): 

select asset, count from clicks order by count desc

asset count
----------------- -----
aksh1hf98qw7tt9q7 2392
i2dfb8fg023714ins 2010
jsg82t8jasvdh2389 1938
構(gòu)建解決方案

現(xiàn)在我們已經(jīng)定義好了業(yè)務(wù)需求,接下來探討如何使用 Redis 5.0 和 Apache Spark 2.4 構(gòu)建其解決方案。在本文中我用的是 Scala 編程語言,但你也可以在 Java 或 Python 中使用 Spark-Redis 庫。

這張流程圖看起來非常簡單:首先系統(tǒng)將數(shù)據(jù)提取到 Redis 流,然后 Redis 流將數(shù)據(jù)作為 Spark 進(jìn)程消費(fèi),并將結(jié)果聚合傳回 Redis,最后使用 Spark-SQL 接口在 Redis 中查詢結(jié)果。 

  1. 數(shù)據(jù)提取:我選擇用 Redis 流提取數(shù)據(jù),因?yàn)樗?Redis 中的內(nèi)置數(shù)據(jù)結(jié)構(gòu),每秒可處理超過一百萬次讀寫操作。此外它還可以根據(jù)時(shí)間自動(dòng)對數(shù)據(jù)排序,并支持簡化數(shù)據(jù)讀取方式的消費(fèi)者組。Spark-Redis 庫支持將 Redis 流作為數(shù)據(jù)源,因此它完全符合我們對流式數(shù)據(jù)庫使用 Apache Spark 引擎的需求。 

  2. 數(shù)據(jù)處理:Apache Spark 中的流框架 API 是我們處理數(shù)據(jù)的絕佳選擇,而 Spark-Redis 庫使我們能夠?qū)⒌竭_(dá) Redis 流的數(shù)據(jù)轉(zhuǎn)換為數(shù)據(jù)幀。使用流框架時(shí),我們可以用微批處理或 Spark 的連續(xù)處理模式運(yùn)行查詢。我們還可以開發(fā)一個(gè)自定義的“編寫器”來將數(shù)據(jù)寫入指定目的地。如圖 2 所示,我們將使用哈希數(shù)據(jù)結(jié)構(gòu)將輸出寫入 Redis。 

  3. 數(shù)據(jù)查詢:Spark-Redis 庫允許你將本機(jī) Redis 數(shù)據(jù)結(jié)構(gòu)映射為數(shù)據(jù)幀。我們可以聲明一個(gè)將列映射到哈希數(shù)據(jù)結(jié)構(gòu)特定鍵的“臨時(shí)表”,并且由于 Redis 的速度非??欤舆t在亞毫秒級別,我們可以使用 Spark-SQL 獲得實(shí)時(shí)查詢能力。

之后我將逐個(gè)介紹如何開發(fā)并運(yùn)行解決方案的各個(gè)組件。在那之前,我們先用適當(dāng)?shù)墓ぞ邅沓跏蓟_發(fā)環(huán)境。

尋找合適的開發(fā)工具

在我們的示例中,我們將使用 Homebrew 包管理器在 macOS 上下載和安裝軟件,你也可以根據(jù)你操作系統(tǒng)的情況選擇其他包管理器。 

  1. Redis 5.0或更高版本: 首先,我們需要在環(huán)境中下載并安裝 Redis 5.x。舊版本的 Redis 不支持 Redis 流。

在 Homebrew 上,我們用下面的命令安裝并啟動(dòng) Redis 5.0: 

$ brew install Redis
$ brew services start Redis

如果你用的還是舊版 Redis,可以用下面的命令升級它: 

$ brew upgrade Redis
  1. Apacke Spark 2.3或更高版本: 接下來我們從官方網(wǎng)站下載并安裝 Apache Spark,或者使用 Homebrew 安裝:

    $ brew install apache-spark
  2. Scala 2.12.8或更高版本:Scala 也是一樣的操作:

    $ brew install scala
  3. Apache Maven:我們需要用 Maven 來構(gòu)建 Spark-Redis 庫。

    $ brew install maven
  4. JDK 1.8或更高版本:我們可以使用下面的命令從甲骨文網(wǎng)站或 Homebrew 下載并安裝這個(gè) JDK。對于最新版本的 JDK,我們需要用 java 替換 java8。

    $ brew cask install java8
  5. Spark-Redis:這是我們解決方案的核心部分,這里從 GitHub 下載庫并構(gòu)建軟件包,如下所示:

    $ git clone https://github.com/RedisLabs/spark-redis.git
    $ cd spark-redis
    $ mvn clean package -DskipTests

這會(huì)在./target/ 目錄下加入 spark-redis-。在我的設(shè)置中這個(gè)文件是 spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar

  1. SBT 1.2.8或更高版本:SBT 是一個(gè) Scala 構(gòu)建工具,可簡化管理和構(gòu)建 Scala 文件的工作。

    $ brew install sbt
  2. 開發(fā)環(huán)境:最后該設(shè)置文件夾結(jié)構(gòu)并構(gòu)建文件了。本示例中我們將把程序代碼放在“scala”目錄下。

    $ mkdir scala
    $ cd ./scala

使用以下內(nèi)容創(chuàng)建一個(gè)新文件 build.sbt: 

name := 'RedisExample'

version := '1.0'

scalaVersion := '2.12.8' 

val sparkVersion = '2.4.0'

libraryDependencies ++= Seq(
 'org.apache.spark' %% 'spark-core' % sparkVersion,
 'org.apache.spark' %% 'spark-sql' % sparkVersion,
 'org.apache.spark' %% 'spark-catalyst' % sparkVersion
)

初始化目錄。用以下命令初始化包目錄: 

$ mkdir ./src/main/scala/
$ mkdir ./lib
$ sbt package

spark-redis-復(fù)制到 lib 目錄。

構(gòu)建我們的點(diǎn)擊計(jì)數(shù)解決方案

如架構(gòu)部分所述,我們的解決方案包含三個(gè)部分:數(shù)據(jù)提取組件、Spark 引擎內(nèi)的數(shù)據(jù)處理器和數(shù)據(jù)查詢接口。在本節(jié)中我將詳細(xì)說明這三個(gè)部分并組合出一個(gè)有效的解決方案。

  1. 提取 Redis 流

Redis 流是一種僅附加數(shù)據(jù)結(jié)構(gòu)。假設(shè) Apache Spark 的連續(xù)處理單元將消費(fèi)這些數(shù)據(jù),我們可以將消息數(shù)限制為一百萬。稍微修改一下前面提到的命令: 

XADD clicks MAXLEN ~ 1000000 * asset aksh1hf98qw7tt9q7 cost 29

大多數(shù)流行的 Redis 客戶端都支持 Redis 流,因此根據(jù)你的編程語言,你可以選擇適用 Python 的 redis-py、適用 Java 的 Jedis 或 Lettuce、適用 Node.js 的 node-redis 等等。

  1. 數(shù)據(jù)處理

這一部分分為三個(gè)小節(jié):

  • 從 Redis 流讀取和處理數(shù)據(jù)

  • 將結(jié)果存儲(chǔ)在 Redis 中

  • 運(yùn)行程序

    1. 從 Redis 流讀取數(shù)據(jù)

要在 Spark 中從 Redis 流讀取數(shù)據(jù),我們需要明白怎樣連接到 Redis,以及 Redis 流中數(shù)據(jù)的 Schema 結(jié)構(gòu)。

為了連接到 Redis,我們必須為 Redis 創(chuàng)建一個(gè)帶有連接參數(shù)的新 Spark 會(huì)話(SparkSession): 

val spark = SparkSession
.builder()
.appName('redis-example')
.master('local[*]')
.config('spark.redis.host', 'localhost')
.config('spark.redis.port', '6379')
.getOrCreate()

設(shè)置 Schema 結(jié)構(gòu)時(shí),我們用“clicks”命名流,并為“stream.keys”設(shè)置一個(gè)“clicks”的選項(xiàng)。由于每個(gè)流元素都包含一項(xiàng)資產(chǎn)以及與之相關(guān)的成本,因此我們將創(chuàng)建一個(gè)包含兩個(gè) StructField 的數(shù)組的 StructType——一個(gè)用于“asset”,另一個(gè)用于“cost”,如下所示: 

val clicks = spark
    .readStream
    .format('redis')
    .option('stream.keys','clicks')
    .schema(StructType(Array(
    StructField('asset', StringType),
    StructField('cost', LongType)
    )))
    .load()

在第一個(gè)程序中我們對每個(gè)資產(chǎn)的點(diǎn)擊次數(shù)感興趣。為此創(chuàng)建一個(gè)數(shù)據(jù)幀,其中包含按資產(chǎn)計(jì)數(shù)分組的數(shù)據(jù): 

val byasset = clicks.groupBy('asset').count

最后一步是啟動(dòng)流框架查詢: 

val query = byasset
    .writeStream
    .outputMode('update')
    .foreach(clickWriter)
    .start()

注意這里我們使用自己的 ForeachWriter 將結(jié)果寫回 Redis。如果要將輸出轉(zhuǎn)到控制臺(tái),可以將查詢寫成: 

val query = byasset
    .writeStream
    .outputMode('update')
    .format('console')
    .start()

對于 連續(xù)處理 而言,我們希望在查詢中添加'trigger'命令:.trigger(Trigger.Continuous('1 second'))。trigger 命令不適用于聚合查詢,因此我們無法把它插入這個(gè)示例。

下面是完整的程序代碼。它會(huì)從 Redis 流讀取新的點(diǎn)擊數(shù)據(jù)并使用 Spark 的流框架 API 處理。如果你想在自己的環(huán)境中嘗試,請將程序保存在 src/main/scala 下,命名為 ClickAnalysis.scala。(如果你的 Redis 服務(wù)器不是在端口 6379 上本地運(yùn)行的,請根據(jù)具體情況設(shè)置連接參數(shù)。) 

// Program: ClickAnalysis.scala
//
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.redislabs.provider.redis._

object ClickAnalysis {
def main(args: Array[String]): Unit = {
    val spark = SparkSession
    .builder()
    .appName('redis-example')
    .master('local[*]')
    .config('spark.redis.host', 'localhost')
    .config('spark.redis.port', '6379')
    .getOrCreate()

val clicks = spark
    .readStream
    .format('redis')
    .option('stream.keys','clicks')
    .schema(StructType(Array(
    StructField('asset', StringType),
    StructField('cost', LongType)
    )))
    .load()
val byasset = clicks.groupBy('asset').count

val clickWriter : ClickForeachWriter =
    new ClickForeachWriter('localhost','6379')

val query = byasset
    .writeStream
    .outputMode('update')
    .foreach(clickWriter)
    .start()

query.awaitTermination()

} // End main
} //End object
  1. 將結(jié)果存儲(chǔ)在 Redis 中

為了將結(jié)果寫回 Redis,我們可以開發(fā)一個(gè)名為 ClickForeachWriter 的自定義 ForeachWriter。它會(huì)擴(kuò)展 ForeachWriter,并使用 Redis 的 Java 客戶端 Jedis 連接到 Redis 上。下面是完整的程序代碼,保存為 ClickForeachWriter.scala: 

// Program: ClickForeachWriter.scala
//
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.Row
import redis.clients.jedis.Jedis

class ClickForeachWriter(p_host: String, p_port: String) extends 
ForeachWriter[Row]{

val host: String = p_host
val port: String = p_port

var jedis: Jedis = _

def connect() = {
    jedis = new Jedis(host, port.toInt)
}

    override def open(partitionId: Long, version: Long):
Boolean = {
    return true
}

override def process(record: Row) = {
    var asset = record.getString(0);
    var count = record.getLong(1);
    if(jedis == null){
    connect()
}

jedis.hset('click:'+asset, 'asset', asset)
jedis.hset('click:'+asset, 'count', count.toString)
jedis.expire('click:'+asset, 300)
    }

override def close(errorOrNull: Throwable) = {
    }
}

在這部分程序中有一點(diǎn)需要注意:它將結(jié)果存儲(chǔ)在哈希數(shù)據(jù)結(jié)構(gòu)中,其鍵遵循語法“click:

  1. 運(yùn)行程序

在我們運(yùn)行之前首先需要編譯程序。轉(zhuǎn)到主目錄(我們存儲(chǔ) build.sbt 的目錄)運(yùn)行命令: 

$ sbt package

我們的程序應(yīng)該能順利編譯通過,沒有錯(cuò)誤。如果出現(xiàn)了錯(cuò)誤,請修復(fù)它們并重新運(yùn)行 sbt 包。編譯完成后,在同一目錄中運(yùn)行以下命令來啟動(dòng)程序: 

spark-submit --class ClickAnalysis --jars 
./lib/spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar 
--master local[*] ./target/scala-2.12/redisexample_2.12-1.0.jar

如果你不喜歡調(diào)試消息,可以停止程序(按 ctrl 加 c)并編輯 /usr/local/Cellar/apache-spark/2.4.0/libexec/conf/(或 log4j.properties 文件存儲(chǔ)的目錄)下的 log4j.properties,并將 log4j.rootCategory 更改為 WARN,如下所示: 

log4j.rootCategory=WARN, console

該程序?qū)⒆詣?dòng)從 Redis 流中提取消息。如果 Redis 流中沒有消息,它將異步偵聽新消息。我們可以在新的控制臺(tái)中啟動(dòng) redis-cli 并向 Redis 流添加一條消息,以測試它是否在正常消費(fèi)消息: 

$ redis-cli
redis-cli> XADD clicks * asset test cost 100

一切順利的話,我們應(yīng)該能在哈希數(shù)據(jù)結(jié)構(gòu)中讀取結(jié)果:

redis-cli> hgetall click:test
1) 'asset'
2) 'test'
3) 'count'
4) '1'
  1. 查詢數(shù)據(jù):將 Redis 數(shù)據(jù)讀取為數(shù)據(jù)幀

我們解決方案的最后一個(gè)組件實(shí)際上為 Redis 數(shù)據(jù)提供了一個(gè) SQL 接口。通過 SQL 命令讀取數(shù)據(jù)又是一個(gè)兩步過程:首先,我們?yōu)?Redis 數(shù)據(jù)定義 SQL schema;其次,我們運(yùn)行 SQL 命令。

但在此之前,我們需要從主目錄上在控制臺(tái)運(yùn)行 spark-sql,如下所示: 

$ spark-sql --jars 
./lib/spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar

然后會(huì)轉(zhuǎn)到 spark-sql 提示符下: 

spark-sql>

現(xiàn)在我們要為 Redis 哈希數(shù)據(jù)結(jié)構(gòu)中存儲(chǔ)的數(shù)據(jù)定義 SQL schema。如前所述,我們將每個(gè)資產(chǎn)的數(shù)據(jù)存儲(chǔ)在由鍵:click:

spark-sql> CREATE TABLE IF NOT EXISTS clicks(asset STRING, count 
INT) USING org.apache.spark.sql.redis OPTIONS (table 'click')

此命令創(chuàng)建一個(gè)名為“clicks”的新表視圖。它使用 Spark-Redis 庫中指定的指令將“asset”和“count”列映射到哈希結(jié)構(gòu)中的對應(yīng)字段?,F(xiàn)在我們可以運(yùn)行查詢: 

spark-sql> select * from clicks;
test 1
Time taken: 0.088 seconds, Fetched 1 row(s)

如果要以編程方式運(yùn)行 SQL 查詢,請參閱 Apache Spark 提供的有關(guān)如何使用 ODBC/JDBC 驅(qū)動(dòng)程序連接到 Spark 引擎的文檔。

我們的成果是什么?

在本文中,我演示了如何使用 Redis 流作為 Apache Spark 引擎的數(shù)據(jù)源,介紹了 Redis 流是怎樣為流框架用例提供支持的。我還展示了如何使用 Apache Spark 中的數(shù)據(jù)幀 API 讀取 Redis 數(shù)據(jù),并融合流框架和數(shù)據(jù)幀的理念說明了 Spark-Redis 庫可以實(shí)現(xiàn)的功能。

Redis 流簡化了高速收集和分發(fā)數(shù)據(jù)的任務(wù)。將其與 Apache Spark 中的流框架相結(jié)合,可以支持需要實(shí)時(shí)計(jì)算的各種解決方案,包括物聯(lián)網(wǎng)、欺詐檢測、人工智能和機(jī)器學(xué)習(xí)、實(shí)時(shí)分析等。

作者介紹

Roshan Kumar 是 Redis Labs 的高級產(chǎn)品經(jīng)理。他在軟件開發(fā)和技術(shù)領(lǐng)域的產(chǎn)品管理方面擁有豐富的經(jīng)驗(yàn)。他曾在惠普公司和一些成功的硅谷創(chuàng)業(yè)公司工作。他擁有計(jì)算機(jī)科學(xué)學(xué)士學(xué)位和美國加利福尼亞州圣克拉拉大學(xué)的 MBA 學(xué)位。

本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊舉報(bào)。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
大數(shù)據(jù)需要用到的知識
用Spark做數(shù)據(jù)分析是怎樣一種體驗(yàn)?
共筑Spark大數(shù)據(jù)引擎的七大工具
Apache 兩個(gè)開源項(xiàng)目比較:Flink vs Spark
【小白視角】大數(shù)據(jù)基礎(chǔ)實(shí)踐(七) Spark的基本操作
IBM專家深入淺出講解Spark2.0
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服