第一步:下載Eclipse IDE for JAVA Developer
第二步:解壓并啟動(dòng)Eclipse
第三步:創(chuàng)建Maven工程
第四步:使用maven-archetype-quickstart,設(shè)定一些包名
第五步:通過(guò)BuildPath把默認(rèn)的J2EE 1.5變成Java1.8
第六步:配置pom.xml,添加程序開發(fā)時(shí)的相關(guān)依賴,并配置具體build打包的信息
POM.xml
有各種依賴的支持
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.2</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.2</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.6.2</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.2</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.6.2</version></dependency><dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-graphx_2.10</artifactId> <version>1.6.2</version></dependency><dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version></dependency>
http://maven.outofmemory.cn/org.apache.spark
package com.tom.spark.SparkApps.cores;import java.util.Arrays;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;/** * 使用Java的方式開發(fā)本地測(cè)試Spark的WordCount程序 * @author * */public class WordCount { public static void main(String[] args) { // TODO Auto-generated method stub /** * 第一步:創(chuàng)建Spark的配置對(duì)象,SparkConf,設(shè)置Spark程序的運(yùn)行時(shí)的配置信息 * 例如通過(guò)setMaster來(lái)設(shè)置程序要連接的spark集群的Master的URL,如果設(shè)置 * 為local,則代表Spark程序在本地運(yùn)行,特別適合于機(jī)器配置條件非常差的初學(xué)者 * */ SparkConf conf = new SparkConf().setAppName("Spark WordCount written by Java").setMaster("local"); /** * 第二步:創(chuàng)建SparkContext對(duì)象 * SparkContext是Spark程序所有功能的唯一入口,無(wú)論采用Scala、Java、Python、R等都必須有一個(gè)SparkContext(不同的語(yǔ)言具體的類名稱不同,Java則為JavaSparkContext) * SparkContext核心作用:初始化Spark應(yīng)用程序運(yùn)行所需要的核心組件,包括DAGScheduler、TaskScheduler、SchedulerBackEnd * 同時(shí)還會(huì)負(fù)責(zé)Spark程序往Master注冊(cè)程序等 * SparkContext是整個(gè)Spark應(yīng)用程序中最為至關(guān)重要的一個(gè)對(duì)象 */ JavaSparkContext sc = new JavaSparkContext(conf); /** * 第三步:根據(jù)具體的數(shù)據(jù)來(lái)源(HDFS、HBase、Local FS、DB、S3等)通過(guò)SparkContext來(lái)創(chuàng)建JavaRDD * JavaRDD的創(chuàng)建基本有三種方式:根據(jù)外部的數(shù)據(jù)來(lái)源(例如HDFS)、根據(jù)Scala集合、由其他JavaRDD操作 * 數(shù)據(jù)會(huì)被JavaRDD劃分成為一系列的Partitions,分配到每個(gè)Partition的數(shù)據(jù)屬于一個(gè)Task的處理范疇 */ JavaRDD<String> lines = sc.textFile("F:/channel.txt",1); /** * 第四步:對(duì)初始的JavaRDD進(jìn)行Transformation級(jí)別的處理,例如map、filter等高階函數(shù)等的編程,來(lái)進(jìn)行具體的數(shù)據(jù)計(jì)算 * 第4.1步:將每一行的字符串拆分成單個(gè)的單詞 */ JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>(){ //如果shiScala,由于SAM轉(zhuǎn)換,所以可以寫成val words = lines.flatMap(_.split(" ")) @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); /** * 第4.2步:在單詞拆分的基礎(chǔ)上對(duì)每個(gè)單詞實(shí)例計(jì)數(shù)為1,也就是word => (word, 1) */ JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer> (word, 1); } }); /** * 第4.3步:在單詞實(shí)例計(jì)數(shù)為1基礎(chǔ)上,統(tǒng)計(jì)每個(gè)單詞在文件中出現(xiàn)的總次數(shù) */ JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){ //對(duì)相同的key,進(jìn)行Value的累加(包括Local和Reducer級(jí)別同時(shí)Reduce) @Override public Integer call(Integer v1, Integer v2) throws Exception { // TODO Auto-generated method stub return v1 + v2; } }); wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>(){ @Override public void call(Tuple2<String, Integer> pairs) throws Exception { // TODO Auto-generated method stub System.out.println(pairs._1 + " : " + pairs._2); } }); sc.close(); }}
作業(yè):放在集群上跑
聯(lián)系客服