免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
關于spark寫入文件至文件系統(tǒng)并制定文件名之自定義outputFormat
https://www.cnblogs.com/Gxiaobai/p/10705712.html
引言:
spark項目中通常我們需要將我們處理之后數據保存到文件中,比如將處理之后的RDD保存到hdfs上指定的目錄中,亦或是保存在本地
spark保存文件:
1、rdd.saveAsTextFile("file:///E:/dataFile/result")
2、rdd.saveAsHadoopFile("file:///E:/dataFile/result",classOf[T],classOf[T],classOf[outputFormat.class])
3、df.write.format("csv").save("file:///E:/dataFile/result")
以上都簡單的,最普遍的保存文件的方式,有時候是不能夠滿足我們的需求,上述的文件保存方式中,保存之后,文件名通常是part-00000的方式保存在result文件夾中,但是,我希望能夠根據需求自己來定義這個文件名,并且指定的保存的文件夾必須事先不能存在,如果存在的話保存文件會報錯。
此時就需要我們自定義文件保存名。
自定義保存文件名:
需要自定義保存的文件名的話,就需要我們重新對輸出的文件的方式進行一個格式化,也就是說不能夠使用系統(tǒng)默認的輸出文件的方式,需要我們自定義輸出格式,需要重寫outputFormat類。
示例:
需求:需要將數據庫中的數據通過sparksql讀取之后進行計算,然后進行計算,最終以指定的文件名寫入到指定的目錄下面:
數據庫內容:
保存之后的文件:
保存路徑:本地“E:/dataFile/result”,該目錄下,文件名為person.txt
保存之后文件名:
保存后文件內容:
代碼實現:
需要自定一個一個類重寫outputFormat類中的方法
這里我使用saveAsHadoopFile的方式進行保存文件,如果是使用saveAsTextFile的方式的話,因為只有能傳入一個參數,
saveAsHadoopFile的形式保存文件,該方式是針對<k,v>對的RDD進行保存,保存的文件中內容是key和value,以空格分開,相同的key或保存在同一個文件中
上代碼:
第一步:重寫FileoutputFormat類
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package cn.com.xxx.audit
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class CustomOutputFormat extends MultipleTextOutputFormat[Any, Any] {
<br> //重寫generateFileNameForKeyValue方法,該方法是負責自定義生成文件的文件名
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {<br>  //這里的key和value指的就是要寫入文件的rdd對,再此,我定義文件名以key.txt來命名,當然也可以根據其他的需求來進行生成文件名
val fileName = key.asInstanceOf[String] + ".txt"
fileName
}
/**<br>   *因為saveAsHadoopFile是以key,value的形式保存文件,寫入文件之后的內容也是,按照key value的形式寫入,k,v之間用空格隔開,這里我只需要寫入value的值,不需要將key的值寫入到文件中個,所以我需要重寫<br>   *該方法,讓輸入到文件中的key為空即可,當然也可以進行領過的變通,也可以重寫generateActuralValue(key:Any,value:Any),根據自己的需求來實現<br>   */
override def generateActualKey(key: Any, value: Any): String = {
null
}
//對生成的value進行轉換為字符串,當然源碼中默認也是直接返回value值,如果對value沒有特殊處理的話,不需要重寫該方法<br>  override def generateAcutalValue(key: Any, value: Any): String = {<br>     return value.asInstance[String]<br>  }<br> /**<br>   * 該方法使用來檢查我們輸出的文件目錄是否存在,源碼中,是這樣判斷的,如果寫入的父目錄已經存在的話,則拋出異常<br>   * 在這里我們沖寫這個方法,修改文件目錄的判斷方式,如果傳入的文件寫入目錄已存在的話,直接將其設置為輸出目錄即可,<br>   * 不會拋出異常<br>   */
override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = {
var outDir: Path = FileOutputFormat.getOutputPath(job)
if (outDir != null) {<br>    //注意下面的這兩句,如果說你要是寫入文件的路徑是hdfs的話,下面的兩句不要寫,或是注釋掉,它倆的作用是標準化文件輸出目錄,根據我的理解是,他們是標準化本地路徑,寫入本地的話,可以加上,本地路徑記得要用file:///開頭,比如file:///E:/a.txt
//val fs: FileSystem = ignored
//outDir = fs.makeQualified(outDir)
FileOutputFormat.setOutputPath(job, outDir)
}
}
}
第二步:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package scala.spark._sql
import java.util.Properties
import mysqlUtils.OperatorMySql
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
object DataFrameToMySql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//配置輸出文件不生成success文件
sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
//配置一些參數
//如果設置為true,sparkSql將會根據數據統(tǒng)計信息,自動為每一列選擇單獨的壓縮編碼方式
sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true")
//控制列式緩存批量的大小。增大批量大小可以提高內存的利用率和壓縮率,但同時也會帶來OOM的風險
sqlContext.setConf("spark.sql.inMemoryColumnarStorage.batchSize", "1000")
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "10485760")
//設為true,則啟用優(yōu)化的Tungsten物理執(zhí)行后端。Tungsten會顯示的管理內存,并動態(tài)生成表達式求值得字節(jié)碼
sqlContext.setConf("spark.sql.tungsten.enabled", "true")
//配置shuffle是的使用的分區(qū)數
sqlContext.setConf("spark.sql.shuffle.partitions", "200")
sc.setLogLevel("WARN")
val pro = new Properties()
pro.put("user", "root")
pro.put("password", "123456")
pro.put("driver", "com.mysql.jdbc.Driver")
val url = "jdbc:mysql://localhost:3306/test?serverTimezone=UTC"
val rdf = sqlContext.read /*.jdbc(url,"person1",pro)*/
.format("jdbc")
.options(Map(
"url" -> url,
"dbtable" -> "person",
"driver" -> "com.mysql.jdbc.Driver",
"user" -> "root",
"password" -> "123456",
"fetchSize" -> "10",
"partitionColumn" -> "age",
"lowerBound" -> "0",
"upperBound" -> "1000",
"numPartitions" -> "2"
)).load()         //將讀取的文件盡心個計算,并且以pairRDD的形式寫入文件中,這里在寫入文件的時候,會將key當做文件名來進行寫入,也就是說相同的key對應的value都會寫入到相同的文件中
val x = rdf.groupBy(substring(col("score"), 0, 5) as ("score")).agg(max("age") as ("max"), avg("age") as ("avg"))
.rdd.map(x => ("person", x(0) + "," + x(1) + "," + x(2)))           //這里partitionBy,只是來增加文件文件寫入的并行度,可以根據需求進行設置,影響的是文件寫入的性能,我個人是這么理解的,如果有不對的還請指正
.partitionBy(new HashPartitioner(10))            //這里寫入的時候,要指定我們自定義的PairRDDMultipleTextOutputFormat類
.saveAsHadoopFile("file:///E:/dataFile/res", classOf[String], classOf[String], classOf[PairRDDMultipleTextOutputFormat])    sc.stop()
}
寫入結果:
文件內容:
文件名稱:
文件夾名稱:
E:\dataFile\res
改文件夾事先已經存在,因為重寫了checkOutputSpecs方法,做了處理,所以不會拋出異常,如果改文件夾目錄實現不存在的話,程序會自動去創(chuàng)建一個該文件夾
跟蹤FileOutput源碼
主要來看下我們重寫的這幾個方法,源碼中都做了些什么:
類名:MultipleOutputFormat
從源碼中可以很容易的看到各個類的實現。
這樣我們就可以根據我們的需求,將spark計算之后的數據寫入到我們指定的文件夾下面,并且指定生成的文件名。
這個問題搞了我兩三天了,網上各種找,都說是要重寫什么getRecordWriter方法,理清了思路之后,才發(fā)現,不是我需要的,在此記錄一下
本站僅提供存儲服務,所有內容均由用戶發(fā)布,如發(fā)現有害或侵權內容,請點擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
ppt幻燈片制作基礎教程圖解
Spark入門實戰(zhàn)系列
RDD創(chuàng)建內幕徹底解密
part4-spark共享變量
Apache 兩個開源項目比較:Flink vs Spark
字節(jié)推薦算法終于開源!吹爆!|數據倉庫|原理|算法|編程
更多類似文章 >>
生活服務
分享 收藏 導長圖 關注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權!
如果VIP功能使用有故障,
可點擊這里聯系客服!

聯系客服