Apache Spark 2.0引入了SparkSession,其為用戶提供了一個(gè)統(tǒng)一的切入點(diǎn)來使用Spark的各項(xiàng)功能,并且允許用戶通過它調(diào)用DataFrame和Dataset相關(guān)API來編寫Spark程序。最重要的是,它減少了用戶需要了解的一些概念,使得我們可以很容易地與Spark交互。
本文我們將介紹在Spark 2.0中如何使用SparkSession。更多關(guān)于SparkSession的文章請(qǐng)參見:《SparkSession:新的切入點(diǎn)》、《Spark 2.0介紹:創(chuàng)建和使用相關(guān)API》、《Apache Spark 2.0.0正式發(fā)布及其功能介紹》
探索SparkSession統(tǒng)一的功能
首先,我們介紹一個(gè)簡(jiǎn)單的Spark應(yīng)用案例:SparkSessionZipsExample,其從JSON文件中讀取郵政編碼,并且通過DataFrame API進(jìn)行一些分析,之后使用Spark SQL進(jìn)行一些查詢,這些操作并沒有使用到SparkContext, SQLContext 或者HiveContext。
創(chuàng)建SparkSession
在2.0版本之前,與Spark交互之前必須先創(chuàng)建SparkConf和SparkContext,代碼如下:
- //set up the spark configuration and create contexts
- val sparkConf = new SparkConf().setAppName("SparkSessionZipsExample").setMaster("local")
- // your handle to SparkContext to access other context like SQLContext
- val sc = new SparkContext(sparkConf).set("spark.some.config.option", "some-value")
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
然而在Spark 2.0中,我們可以通過SparkSession來實(shí)現(xiàn)同樣的功能,而不需要顯式地創(chuàng)建SparkConf, SparkContext 以及 SQLContext,因?yàn)檫@些對(duì)象已經(jīng)封裝在SparkSession中。使用生成器的設(shè)計(jì)模式(builder design pattern),如果我們沒有創(chuàng)建SparkSession對(duì)象,則會(huì)實(shí)例化出一個(gè)新的SparkSession對(duì)象及其相關(guān)的上下文。
- // Create a SparkSession. No need to create SparkContext
- // You automatically get it as part of the SparkSession
- val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
- val spark = SparkSession
- .builder()
- .appName("SparkSessionZipsExample")
- .config("spark.sql.warehouse.dir", warehouseLocation)
- .enableHiveSupport()
- .getOrCreate()
到現(xiàn)在我們可以使用上面創(chuàng)建好的spark對(duì)象,并且訪問其public方法。
配置Spark運(yùn)行相關(guān)屬性
一旦我們創(chuàng)建好了SparkSession,我們就可以配置Spark運(yùn)行相關(guān)屬性。比如下面代碼片段我們修改了已經(jīng)存在的運(yùn)行配置選項(xiàng)。
- //set new runtime options
- spark.conf.set("spark.sql.shuffle.partitions", 6)
- spark.conf.set("spark.executor.memory", "2g")
- //get all settings
- val configMap:Map[String, String] = spark.conf.getAll()
獲取Catalog元數(shù)據(jù)
通常我們想訪問當(dāng)前系統(tǒng)的Catalog元數(shù)據(jù)。SparkSession提供了catalog實(shí)例來操作metastore。這些方法放回的都是Dataset類型的,所有我們可以使用Dataset相關(guān)的API來訪問其中的數(shù)據(jù)。如下代碼片段,我們展示了所有的表并且列出當(dāng)前所有的數(shù)據(jù)庫:
- //fetch metadata data from the catalog
- scala> spark.catalog.listDatabases.show(false)
- +--------------+---------------------+--------------------------------------------------------+
- |name |description |locationUri |
- +--------------+---------------------+--------------------------------------------------------+
- |default |Default Hive database|hdfs://iteblogcluster/user/iteblog/hive/warehouse |
- +--------------+---------------------+--------------------------------------------------------+
-
- scala> spark.catalog.listTables.show(false)
- +----------------------------------------+--------+-----------+---------+-----------+
- |name |database|description|tableType|isTemporary|
- +----------------------------------------+--------+-----------+---------+-----------+
- |iteblog |default |null |MANAGED |false |
- |table2 |default |null |EXTERNAL |false |
- |test |default |null |MANAGED |false |
- +----------------------------------------+--------+-----------+---------+-----------+
創(chuàng)建Dataset和Dataframe
使用SparkSession APIs創(chuàng)建 DataFrames 和 Datasets的方法有很多,其中最簡(jiǎn)單的方式就是使用spark.range方法來創(chuàng)建一個(gè)Dataset。當(dāng)我們學(xué)習(xí)如何操作Dataset API的時(shí)候,這個(gè)方法非常有用。操作如下:
- scala> val numDS = spark.range(5, 100, 5)
- numDS: org.apache.spark.sql.Dataset[Long] = [id: bigint]
-
- scala> numDS.orderBy(desc("id")).show(5)
- +---+
- | id|
- +---+
- | 95|
- | 90|
- | 85|
- | 80|
- | 75|
- +---+
- only showing top 5 rows
-
- scala> numDS.describe().show()
- +-------+------------------+
- |summary| id|
- +-------+------------------+
- | count| 19|
- | mean| 50.0|
- | stddev|28.136571693556885|
- | min| 5|
- | max| 95|
- +-------+------------------+
- scala> val langPercentDF = spark.createDataFrame(List(("Scala", 35),
- | ("Python", 30), ("R", 15), ("Java", 20)))
- langPercentDF: org.apache.spark.sql.DataFrame = [_1: string, _2: int]
-
- scala> val lpDF = langPercentDF.withColumnRenamed("_1", "language").withColumnRenamed("_2", "percent")
- lpDF: org.apache.spark.sql.DataFrame = [language: string, percent: int]
-
- scala> lpDF.orderBy(desc("percent")).show(false)
- +--------+-------+
- |language|percent|
- +--------+-------+
- |Scala |35 |
- |Python |30 |
- |Java |20 |
- |R |15 |
- +--------+-------+
使用SparkSession讀取CSV
創(chuàng)建完SparkSession之后,我們就可以使用它來讀取數(shù)據(jù),下面代碼片段是使用SparkSession來從csv文件中讀取數(shù)據(jù):
- val df = sparkSession.read.option("header","true").
- csv("src/main/resources/sales.csv")
上面代碼非常像使用SQLContext來讀取數(shù)據(jù),我們現(xiàn)在可以使用SparkSession來替代之前使用SQLContext編寫的代碼。下面是完整的代碼片段:
- package com.iteblog
-
- import org.apache.spark.sql.SparkSession
-
- /**
- * Spark Session example
- *
- */
- object SparkSessionExample {
-
- def main(args: Array[String]) {
-
- val sparkSession = SparkSession.builder.
- master("local")
- .appName("spark session example")
- .getOrCreate()
-
- val df = sparkSession.read.option("header","true").csv("src/main/resources/sales.csv")
-
- df.show()
-
- }
-
- }
使用SparkSession API讀取JSON數(shù)據(jù)
我們可以使用SparkSession來讀取JSON、CVS或者TXT文件,甚至是讀取parquet表。比如在下面代碼片段里面,我將讀取郵編數(shù)據(jù)的JSON文件,并且返回DataFrame對(duì)象:
- // read the json file and create the dataframe
- scala> val jsonFile = "/user/iteblog.json"
- jsonFile: String = /user/iteblog.json
- scala> val zipsDF = spark.read.json(jsonFile)
- zipsDF: org.apache.spark.sql.DataFrame = [_id: string, city: string ... 3 more fields]
-
- scala> zipsDF.filter(zipsDF.col("pop") > 40000).show(10, false)
- +-----+----------+-----------------------+-----+-----+
- |_id |city |loc |pop |state|
- +-----+----------+-----------------------+-----+-----+
- |01040|HOLYOKE |[-72.626193, 42.202007]|43704|MA |
- |01085|MONTGOMERY|[-72.754318, 42.129484]|40117|MA |
- |01201|PITTSFIELD|[-73.247088, 42.453086]|50655|MA |
- |01420|FITCHBURG |[-71.803133, 42.579563]|41194|MA |
- |01701|FRAMINGHAM|[-71.425486, 42.300665]|65046|MA |
- |01841|LAWRENCE |[-71.166997, 42.711545]|45555|MA |
- |01902|LYNN |[-70.941989, 42.469814]|41625|MA |
- |01960|PEABODY |[-70.961194, 42.532579]|47685|MA |
- |02124|DORCHESTER|[-71.072898, 42.287984]|48560|MA |
- |02146|BROOKLINE |[-71.128917, 42.339158]|56614|MA |
- +-----+----------+-----------------------+-----+-----+
- only showing top 10 rows
在SparkSession中還用Spark SQL
通過SparkSession我們可以訪問Spark SQL中所有函數(shù),正如你使用SQLContext訪問一樣。下面代碼片段中,我們創(chuàng)建了一個(gè)表,并在其中使用SQL查詢:
- // Now create an SQL table and issue SQL queries against it without
- // using the sqlContext but through the SparkSession object.
- // Creates a temporary view of the DataFrame
- scala> zipsDF.createOrReplaceTempView("zips_table")
-
- scala> zipsDF.cache()
- res3: zipsDF.type = [_id: string, city: string ... 3 more fields]
-
- scala> val resultsDF = spark.sql("SELECT city, pop, state, _id FROM zips_table")
- resultsDF: org.apache.spark.sql.DataFrame = [city: string, pop: bigint ... 2 more fields]
-
- scala> resultsDF.show(10)
- +------------+-----+-----+-----+
- | city| pop|state| _id|
- +------------+-----+-----+-----+
- | AGAWAM|15338| MA|01001|
- | CUSHMAN|36963| MA|01002|
- | BARRE| 4546| MA|01005|
- | BELCHERTOWN|10579| MA|01007|
- | BLANDFORD| 1240| MA|01008|
- | BRIMFIELD| 3706| MA|01010|
- | CHESTER| 1688| MA|01011|
- |CHESTERFIELD| 177| MA|01012|
- | CHICOPEE|23396| MA|01013|
- | CHICOPEE|31495| MA|01020|
- +------------+-----+-----+-----+
- only showing top 10 rows
使用SparkSession讀寫Hive表
下面我們將使用SparkSession創(chuàng)建一個(gè)Hive表,并且對(duì)這個(gè)表進(jìn)行一些SQL查詢,正如你使用HiveContext一樣:
- scala> spark.sql("DROP TABLE IF EXISTS iteblog_hive")
- res5: org.apache.spark.sql.DataFrame = []
-
- scala> spark.table("zips_table").write.saveAsTable("iteblog_hive")
- 16/08/24 21:52:59 WARN HiveMetaStore: Location: hdfs://iteblogcluster/user/iteblog/hive/warehouse/iteblog_hive specified for non-external table:iteblog_hive
-
- scala> val resultsHiveDF = spark.sql("SELECT city, pop, state, _id FROM iteblog_hive WHERE pop > 40000")
- resultsHiveDF: org.apache.spark.sql.DataFrame = [city: string, pop: bigint ... 2 more fields]
-
- scala> resultsHiveDF.show(10)
- +----------+-----+-----+-----+
- | city| pop|state| _id|
- +----------+-----+-----+-----+
- | HOLYOKE|43704| MA|01040|
- |MONTGOMERY|40117| MA|01085|
- |PITTSFIELD|50655| MA|01201|
- | FITCHBURG|41194| MA|01420|
- |FRAMINGHAM|65046| MA|01701|
- | LAWRENCE|45555| MA|01841|
- | LYNN|41625| MA|01902|
- | PEABODY|47685| MA|01960|
- |DORCHESTER|48560| MA|02124|
- | BROOKLINE|56614| MA|02146|
- +----------+-----+-----+-----+
- only showing top 10 rows
正如你所見,你使用DataFrame API, Spark SQL 以及 Hive查詢的結(jié)果都一樣。
本文翻譯自:https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)
點(diǎn)擊舉報(bào)。