免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
SparkSession簡(jiǎn)單介紹

       Apache Spark 2.0引入了SparkSession,其為用戶提供了一個(gè)統(tǒng)一的切入點(diǎn)來使用Spark的各項(xiàng)功能,并且允許用戶通過它調(diào)用DataFrame和Dataset相關(guān)API來編寫Spark程序。最重要的是,它減少了用戶需要了解的一些概念,使得我們可以很容易地與Spark交互。
  本文我們將介紹在Spark 2.0中如何使用SparkSession。更多關(guān)于SparkSession的文章請(qǐng)參見:《SparkSession:新的切入點(diǎn)》、《Spark 2.0介紹:創(chuàng)建和使用相關(guān)API》、《Apache Spark 2.0.0正式發(fā)布及其功能介紹》


探索SparkSession統(tǒng)一的功能

  首先,我們介紹一個(gè)簡(jiǎn)單的Spark應(yīng)用案例:SparkSessionZipsExample,其從JSON文件中讀取郵政編碼,并且通過DataFrame API進(jìn)行一些分析,之后使用Spark SQL進(jìn)行一些查詢,這些操作并沒有使用到SparkContext, SQLContext 或者HiveContext。

創(chuàng)建SparkSession

在2.0版本之前,與Spark交互之前必須先創(chuàng)建SparkConf和SparkContext,代碼如下:

  1. //set up the spark configuration and create contexts  
  2. val sparkConf = new SparkConf().setAppName("SparkSessionZipsExample").setMaster("local")  
  3. // your handle to SparkContext to access other context like SQLContext  
  4. val sc = new SparkContext(sparkConf).set("spark.some.config.option", "some-value")  
  5. val sqlContext = new org.apache.spark.sql.SQLContext(sc)  
  然而在Spark 2.0中,我們可以通過SparkSession來實(shí)現(xiàn)同樣的功能,而不需要顯式地創(chuàng)建SparkConf, SparkContext 以及 SQLContext,因?yàn)檫@些對(duì)象已經(jīng)封裝在SparkSession中。使用生成器的設(shè)計(jì)模式(builder design pattern),如果我們沒有創(chuàng)建SparkSession對(duì)象,則會(huì)實(shí)例化出一個(gè)新的SparkSession對(duì)象及其相關(guān)的上下文。
  1. // Create a SparkSession. No need to create SparkContext  
  2. // You automatically get it as part of the SparkSession  
  3. val warehouseLocation = "file:${system:user.dir}/spark-warehouse"  
  4. val spark = SparkSession  
  5.    .builder()  
  6.    .appName("SparkSessionZipsExample")  
  7.    .config("spark.sql.warehouse.dir", warehouseLocation)  
  8.    .enableHiveSupport()  
  9.    .getOrCreate()  
到現(xiàn)在我們可以使用上面創(chuàng)建好的spark對(duì)象,并且訪問其public方法。

配置Spark運(yùn)行相關(guān)屬性

  一旦我們創(chuàng)建好了SparkSession,我們就可以配置Spark運(yùn)行相關(guān)屬性。比如下面代碼片段我們修改了已經(jīng)存在的運(yùn)行配置選項(xiàng)。
  1. //set new runtime options  
  2. spark.conf.set("spark.sql.shuffle.partitions", 6)  
  3. spark.conf.set("spark.executor.memory", "2g")  
  4. //get all settings  
  5. val configMap:Map[String, String] = spark.conf.getAll()  

獲取Catalog元數(shù)據(jù)

  通常我們想訪問當(dāng)前系統(tǒng)的Catalog元數(shù)據(jù)。SparkSession提供了catalog實(shí)例來操作metastore。這些方法放回的都是Dataset類型的,所有我們可以使用Dataset相關(guān)的API來訪問其中的數(shù)據(jù)。如下代碼片段,我們展示了所有的表并且列出當(dāng)前所有的數(shù)據(jù)庫:
  1. //fetch metadata data from the catalog  
  2. scala> spark.catalog.listDatabases.show(false)  
  3. +--------------+---------------------+--------------------------------------------------------+  
  4. |name          |description          |locationUri                                             |  
  5. +--------------+---------------------+--------------------------------------------------------+  
  6. |default       |Default Hive database|hdfs://iteblogcluster/user/iteblog/hive/warehouse       |  
  7. +--------------+---------------------+--------------------------------------------------------+  
  8.    
  9. scala> spark.catalog.listTables.show(false)  
  10. +----------------------------------------+--------+-----------+---------+-----------+  
  11. |name                                    |database|description|tableType|isTemporary|  
  12. +----------------------------------------+--------+-----------+---------+-----------+  
  13. |iteblog                                 |default |null       |MANAGED  |false      |  
  14. |table2                                  |default |null       |EXTERNAL |false      |  
  15. |test                                    |default |null       |MANAGED  |false      |  
  16. +----------------------------------------+--------+-----------+---------+-----------+  

創(chuàng)建Dataset和Dataframe

  使用SparkSession APIs創(chuàng)建 DataFrames 和 Datasets的方法有很多,其中最簡(jiǎn)單的方式就是使用spark.range方法來創(chuàng)建一個(gè)Dataset。當(dāng)我們學(xué)習(xí)如何操作Dataset API的時(shí)候,這個(gè)方法非常有用。操作如下:
  1. scala> val numDS = spark.range(5, 100, 5)  
  2. numDS: org.apache.spark.sql.Dataset[Long] = [id: bigint]  
  3.    
  4. scala> numDS.orderBy(desc("id")).show(5)  
  5. +---+  
  6. | id|  
  7. +---+  
  8. | 95|  
  9. | 90|  
  10. | 85|  
  11. | 80|  
  12. | 75|  
  13. +---+  
  14. only showing top 5 rows  
  15.    
  16. scala> numDS.describe().show()  
  17. +-------+------------------+  
  18. |summary|                id|  
  19. +-------+------------------+  
  20. |  count|                19|  
  21. |   mean|              50.0|  
  22. | stddev|28.136571693556885|  
  23. |    min|                 5|  
  24. |    max|                95|  
  25. +-------+------------------+  
  26. scala> val langPercentDF = spark.createDataFrame(List(("Scala", 35),  
  27.      | ("Python", 30), ("R", 15), ("Java", 20)))  
  28. langPercentDF: org.apache.spark.sql.DataFrame = [_1: string, _2: int]  
  29.    
  30. scala> val lpDF = langPercentDF.withColumnRenamed("_1", "language").withColumnRenamed("_2", "percent")  
  31. lpDF: org.apache.spark.sql.DataFrame = [language: string, percent: int]  
  32.    
  33. scala> lpDF.orderBy(desc("percent")).show(false)  
  34. +--------+-------+                                                                
  35. |language|percent|  
  36. +--------+-------+  
  37. |Scala   |35     |  
  38. |Python  |30     |  
  39. |Java    |20     |  
  40. |R       |15     |  
  41. +--------+-------+  


使用SparkSession讀取CSV

創(chuàng)建完SparkSession之后,我們就可以使用它來讀取數(shù)據(jù),下面代碼片段是使用SparkSession來從csv文件中讀取數(shù)據(jù):
  1. val df = sparkSession.read.option("header","true").  
  2.     csv("src/main/resources/sales.csv")  
上面代碼非常像使用SQLContext來讀取數(shù)據(jù),我們現(xiàn)在可以使用SparkSession來替代之前使用SQLContext編寫的代碼。下面是完整的代碼片段:
  1. package com.iteblog  
  2.    
  3. import org.apache.spark.sql.SparkSession  
  4.    
  5. /**  
  6.   * Spark Session example  
  7.   *  
  8.   */  
  9. object SparkSessionExample {  
  10.    
  11.   def main(args: Array[String]) {  
  12.    
  13.     val sparkSession = SparkSession.builder.  
  14.       master("local")  
  15.       .appName("spark session example")  
  16.       .getOrCreate()  
  17.    
  18.     val df = sparkSession.read.option("header","true").csv("src/main/resources/sales.csv")  
  19.    
  20.     df.show()  
  21.    
  22.   }  
  23.    
  24. }  

使用SparkSession API讀取JSON數(shù)據(jù)

  我們可以使用SparkSession來讀取JSON、CVS或者TXT文件,甚至是讀取parquet表。比如在下面代碼片段里面,我將讀取郵編數(shù)據(jù)的JSON文件,并且返回DataFrame對(duì)象:
  1. // read the json file and create the dataframe  
  2. scala> val jsonFile = "/user/iteblog.json"  
  3. jsonFile: String = /user/iteblog.json  
  4. scala> val zipsDF = spark.read.json(jsonFile)  
  5. zipsDF: org.apache.spark.sql.DataFrame = [_id: string, city: string ... 3 more fields]  
  6.    
  7. scala> zipsDF.filter(zipsDF.col("pop") > 40000).show(10, false)  
  8. +-----+----------+-----------------------+-----+-----+  
  9. |_id  |city      |loc                    |pop  |state|  
  10. +-----+----------+-----------------------+-----+-----+  
  11. |01040|HOLYOKE   |[-72.626193, 42.202007]|43704|MA   |  
  12. |01085|MONTGOMERY|[-72.754318, 42.129484]|40117|MA   |  
  13. |01201|PITTSFIELD|[-73.247088, 42.453086]|50655|MA   |  
  14. |01420|FITCHBURG |[-71.803133, 42.579563]|41194|MA   |  
  15. |01701|FRAMINGHAM|[-71.425486, 42.300665]|65046|MA   |  
  16. |01841|LAWRENCE  |[-71.166997, 42.711545]|45555|MA   |  
  17. |01902|LYNN      |[-70.941989, 42.469814]|41625|MA   |  
  18. |01960|PEABODY   |[-70.961194, 42.532579]|47685|MA   |  
  19. |02124|DORCHESTER|[-71.072898, 42.287984]|48560|MA   |  
  20. |02146|BROOKLINE |[-71.128917, 42.339158]|56614|MA   |  
  21. +-----+----------+-----------------------+-----+-----+  
  22. only showing top 10 rows  

在SparkSession中還用Spark SQL

  通過SparkSession我們可以訪問Spark SQL中所有函數(shù),正如你使用SQLContext訪問一樣。下面代碼片段中,我們創(chuàng)建了一個(gè)表,并在其中使用SQL查詢:
  1. // Now create an SQL table and issue SQL queries against it without  
  2. // using the sqlContext but through the SparkSession object.  
  3. // Creates a temporary view of the DataFrame  
  4. scala> zipsDF.createOrReplaceTempView("zips_table")  
  5.    
  6. scala> zipsDF.cache()  
  7. res3: zipsDF.type = [_id: string, city: string ... 3 more fields]  
  8.    
  9. scala> val resultsDF = spark.sql("SELECT city, pop, state, _id FROM zips_table")  
  10. resultsDF: org.apache.spark.sql.DataFrame = [city: string, pop: bigint ... 2 more fields]  
  11.    
  12. scala> resultsDF.show(10)  
  13. +------------+-----+-----+-----+  
  14. |        city|  pop|state|  _id|  
  15. +------------+-----+-----+-----+  
  16. |      AGAWAM|15338|   MA|01001|  
  17. |     CUSHMAN|36963|   MA|01002|  
  18. |       BARRE| 4546|   MA|01005|  
  19. | BELCHERTOWN|10579|   MA|01007|  
  20. |   BLANDFORD| 1240|   MA|01008|  
  21. |   BRIMFIELD| 3706|   MA|01010|  
  22. |     CHESTER| 1688|   MA|01011|  
  23. |CHESTERFIELD|  177|   MA|01012|  
  24. |    CHICOPEE|23396|   MA|01013|  
  25. |    CHICOPEE|31495|   MA|01020|  
  26. +------------+-----+-----+-----+  
  27. only showing top 10 rows  

使用SparkSession讀寫Hive表

下面我們將使用SparkSession創(chuàng)建一個(gè)Hive表,并且對(duì)這個(gè)表進(jìn)行一些SQL查詢,正如你使用HiveContext一樣:
  1. scala> spark.sql("DROP TABLE IF EXISTS iteblog_hive")  
  2. res5: org.apache.spark.sql.DataFrame = []  
  3.    
  4. scala> spark.table("zips_table").write.saveAsTable("iteblog_hive")  
  5. 16/08/24 21:52:59 WARN HiveMetaStore: Location: hdfs://iteblogcluster/user/iteblog/hive/warehouse/iteblog_hive specified for non-external table:iteblog_hive  
  6.    
  7. scala> val resultsHiveDF = spark.sql("SELECT city, pop, state, _id FROM iteblog_hive WHERE pop > 40000")  
  8. resultsHiveDF: org.apache.spark.sql.DataFrame = [city: string, pop: bigint ... 2 more fields]  
  9.    
  10. scala> resultsHiveDF.show(10)  
  11. +----------+-----+-----+-----+  
  12. |      city|  pop|state|  _id|  
  13. +----------+-----+-----+-----+  
  14. |   HOLYOKE|43704|   MA|01040|  
  15. |MONTGOMERY|40117|   MA|01085|  
  16. |PITTSFIELD|50655|   MA|01201|  
  17. | FITCHBURG|41194|   MA|01420|  
  18. |FRAMINGHAM|65046|   MA|01701|  
  19. |  LAWRENCE|45555|   MA|01841|  
  20. |      LYNN|41625|   MA|01902|  
  21. |   PEABODY|47685|   MA|01960|  
  22. |DORCHESTER|48560|   MA|02124|  
  23. | BROOKLINE|56614|   MA|02146|  
  24. +----------+-----+-----+-----+  
  25. only showing top 10 rows  

正如你所見,你使用DataFrame API, Spark SQL 以及 Hive查詢的結(jié)果都一樣。

本文翻譯自:https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html

本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
理解Spark SQL(三)—— Spark SQL程序舉例
Spark計(jì)算引擎之SparkSQL詳解
獨(dú)家 | PySpark和SparkSQL基礎(chǔ):如何利用Python編程執(zhí)行Spark(附代碼)
Apache 兩個(gè)開源項(xiàng)目比較:Flink vs Spark
IBM專家深入淺出講解Spark2.0
Spark的Dataset操作(一)
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長(zhǎng)圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服