免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
大數(shù)據(jù)IMF傳奇行動絕密課程第72課:Spark SQL UDF和UDAF解密與實戰(zhàn)

第72課:Spark SQL UDF和UDAF解密與實戰(zhàn)

/**  * scala代碼  */package com.tom.spark.sqlimport org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}import org.apache.spark.sql.types._import org.apache.spark.sql.{Row, SQLContext}import org.apache.spark.{SparkConf, SparkContext}/**  * UDF:User Defined Function, 用戶自定義的函數(shù),函數(shù)的輸入是一條具體的數(shù)據(jù)記錄,實現(xiàn)上講就是普通的scala函數(shù);  * UDAF:User Defined Aggregation Function, 用戶自定義的聚合函數(shù),函數(shù)本身作用于數(shù)據(jù)集合,能夠在聚合操作的基礎(chǔ)上進行自定義操作;  * 實質(zhì)上講,例如說UDF會被Spark SQL中的catalyst封裝成為expression,最終會通過eval方法來計算輸入的輸入Row,此處的Row和DataFrame  * 中的Row沒有任何關(guān)系  */object SparkSQLUDFUDAF {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local[4]").setAppName("SparkSQLUDFUDAF")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    //模擬實際使用的數(shù)據(jù)    val bigData = Array("Spark", "Spark", "Hadoop", "spark", "Hadoop", "spark", "Hadoop", "Hadoop", "spark", "spark")    /**      * 基于提供的數(shù)據(jù)創(chuàng)建DataFrame      */    val bigDataRdd = sc.parallelize(bigData)    val bigDataRDDRow = bigDataRdd.map(item => {Row(item)})    val structType =  StructType(Array(      new StructField("word", StringType, true)    ))    val bigDataDF = sqlContext.createDataFrame(bigDataRDDRow, structType)    bigDataDF.registerTempTable("bigDataTable") //注冊成為臨時表    /**      * 通過SQLContext注冊UDF,在Scala 2.10.x版本UDF函數(shù)最多可以接收22個輸入?yún)?shù)      */    sqlContext.udf.register("computeLength", (input: String) => input.length)    //直接在sql中使用udf,就像使用SQL自帶的內(nèi)部函數(shù)一樣    sqlContext.sql("select word, computeLength(word) as length from bigDataTable").show    sqlContext.udf.register("wordcount", new MyUDAF)    sqlContext.sql("select word, wordcount(word) as count,computeLength(word) as length " +      "from bigDataTable group by word").show//    while(true){}  }}/**  * 按照模板實現(xiàn)UDAF  */class MyUDAF extends UserDefinedAggregateFunction {  /**    * 該方法指定具體輸入數(shù)據(jù)的類型    * @return    */  override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true)))  /**    * 在進行聚合操作的時候所要處理的數(shù)據(jù)的結(jié)果的類型    * @return    */  override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true)))  /**    * 指定UDAF函數(shù)計算后返回的結(jié)果類型    * @return    */  override def dataType: DataType = IntegerType  /**    * 確保一致性,一般都用true    * @return    */  override def deterministic: Boolean = true  /**    * 在Aggregate之前每組數(shù)據(jù)的初始化結(jié)果    * @param buffer    */  override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0 }  /**    * 在進行聚合的時候,每當(dāng)有新的值進來,對分組后的聚合如何進行計算    * 本地的聚合操作,相當(dāng)于Hadoop MapReduce模型中的Combiner    * @param buffer    * @param input    */  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {    buffer(0) = buffer.getAs[Int](0) + 1  }  /**    * 最后在分布式節(jié)點進行Local Reduce完成后需要進行全局級別的Merge操作    * @param buffer1    * @param buffer2    */  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {    buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)  }  /**    * 返回UDAF最后的計算結(jié)果    * @param buffer    * @return    */  override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)}
本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
SparkSQL內(nèi)置函數(shù)
大數(shù)據(jù)開發(fā)技術(shù)之Spark SQL的多種使用方法
關(guān)于spark寫入文件至文件系統(tǒng)并制定文件名之自定義outputFormat
SparkSession簡單介紹
用spark streaming實時讀取hdfs數(shù)據(jù)并寫入elasticsearch中
Spark入門:讀寫Parquet(DataFrame)
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服