基礎(chǔ)排序算法實戰(zhàn)
二次排序算法實戰(zhàn)
更高級排序算法
排序算法內(nèi)幕解密
sc.setLogLevel("WARN")
基礎(chǔ)排序算法:
sc.textFile().flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_, 1).map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1)).collect
所謂二次排序,就是指,排序的時候考慮兩個維度
2 3
4 1
3 2
4 3
9 7
2 1
構(gòu)造器要有val,因為要做個成員
Scala實現(xiàn)
package com.tom.sparkimport org.apache.spark.{SparkConf, SparkContext}class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable{ override def compare(other: SecondarySortKey): Int = { if(this.first - other.first != 0) {this.first - other.first} else {this.second - other.second} }}object SecondarySortKey { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SecondarySortKey").setMaster("local") val sc = new SparkContext(conf) val lines = sc.textFile("F:/helloSpark2.txt") val pairWithSortKey = lines.map(line => { (new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt), line) } ) val sorted = pairWithSortKey.sortByKey() val sortedResult = sorted.map(pair => pair._2) sortedResult.collect().foreach(println) }}
java實現(xiàn)
/** * SecondarySortKey.java */package com.tom.spark.SparkApps.cores;import java.io.Serializable;import scala.math.Ordered;/** * 自定義二次排序的Key */public class SecondarySortKey implements Ordered<SecondarySortKey>, Serializable{ //需要二次排序的Key private int first; private int second; //二次排序的公開構(gòu)造器 public SecondarySortKey(int first, int second) { this.first = first; this.second = second; } public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } public boolean $greater(SecondarySortKey other) { // TODO Auto-generated method stub if(this.first > other.getSecond()) return true; else if(this.first == other.getFirst() && this.second > other.getSecond()) return true; else return false; } public boolean $greater$eq(SecondarySortKey other) { // TODO Auto-generated method stub if($greater(other)) return true; else if ( this.first == other.getFirst() && this.second == other.second) return true; else return false; } public boolean $less(SecondarySortKey other) { // TODO Auto-generated method stub return !$greater$eq(other); } public boolean $less$eq(SecondarySortKey other) { // TODO Auto-generated method stub return !$greater(other); } public int compare(SecondarySortKey other) { // TODO Auto-generated method stub if(this.first != other.getFirst()) return this.first - other.getFirst(); else return this.second - other.getSecond(); } public int compareTo(SecondarySortKey other) { // TODO Auto-generated method stub if(this.first != other.getFirst()) return this.first - other.getFirst(); else return this.second - other.getSecond(); } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + first; result = prime * result + second; return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; SecondarySortKey other = (SecondarySortKey) obj; if (first != other.first) return false; if (second != other.second) return false; return true; } /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub }}
/** * SecondarySortKeyApp.java */package com.tom.spark.SparkApps;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;import com.tom.spark.SparkApps.cores.SecondarySortKey;/** * 二次排序,具體實現(xiàn)步驟: * 第一步:按照Ordered和Serializable接口實現(xiàn)自定義排序的Key * 第二步:將要排序的二次排序的文件加載進(jìn)<Key, Value>類型的RDD * 第三步:使用sortByKey基于自定義的Key進(jìn)行二次排序 * 第四步:去除掉排序的Key,只保留排序后的結(jié)果 * */public class SecondarySortKeyApp { /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub SparkConf conf = new SparkConf().setAppName("SecondarySortKeyApp").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> line = sc.textFile("F:/helloSpark2.txt",1); JavaPairRDD<SecondarySortKey, String> pairs = line.mapToPair(new PairFunction<String, SecondarySortKey, String>() { public Tuple2<SecondarySortKey, String> call(String line) throws Exception { // TODO Auto-generated method stub return new Tuple2<SecondarySortKey, String>(new SecondarySortKey(Integer.valueOf(line.split(" ")[0]), Integer.valueOf(line.split(" ")[1])), line); } }); JavaPairRDD<SecondarySortKey, String> sortedPairs = pairs.sortByKey(false); //完成二次排序 //過濾掉排序后自定的Key,保留排序的結(jié)果 JavaRDD<String> values = sortedPairs.map(new Function<Tuple2<SecondarySortKey,String>, String>() { public String call(Tuple2<SecondarySortKey, String> pair) throws Exception { // TODO Auto-generated method stub return pair._2; } }); values.foreach(new VoidFunction<String>() { public void call(String line) throws Exception { // TODO Auto-generated method stub System.out.println(line); } }); sc.close(); }}