有興趣想學習國內整套Spark+Spark Streaming+Machine learning頂級課程的,可加我qq 471186150。共享視頻,性價比超高!
1:廣播可以自定義,例如你自定義廣播里面的內容,就有很多你可以自定義的操作。尤其是結合了Broadcast和Accumulator的時候,他可以實現(xiàn)一些非常復雜的功能。
2:廣播和計數器在企業(yè)的實際開發(fā)中,非常重要,主要是可以自定義,自定義的時候可以實現(xiàn)非常復雜的邏輯。計數器Accumulator可以計數黑名單。黑名單數據可以寫在廣播里面
3:下面直接上代碼,當然,這只是初步的使用,廣播和計算器的自定義,絕對是高端的spark技術。它們倆者結合自定義會發(fā)揮非常強大的作用。很多一線互聯(lián)網公司,它們很多復雜的業(yè)務,都需要聯(lián)合使用和自定義廣播和計數器。
package com.dt.streaming;import java.io.BufferedReader;import java.io.InputStreamReader;import java.net.ConnectException;import java.net.Socket;import java.util.Arrays;import java.util.List;import org.apache.spark.Accumulator;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.broadcast.Broadcast;import org.apache.spark.storage.StorageLevel;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.Time;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.receiver.Receiver;import scala.Tuple2;public class SparkStreamingBroadcastAccumulator { /* *第103講 */ //這個是基于原子型的變量,保存黑名單 private static volatile Broadcast<List<String>> broadcastList = null; private static volatile Accumulator<Integer> accumulator = null; public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]"). setAppName("WordCountOnline"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(15)); /** * 實例化廣播,使用Broadcast廣播黑名單到每個Executor中,廣播是基于SparkContext的。而不是StreamingContext。 * 沒有action,廣播是不能發(fā)出的 */ broadcastList = jsc.sparkContext().broadcast(Arrays.asList("Hadoop","Mahout","Hive")); /** * 全局計數器,用于統(tǒng)計在線過濾了多少個黑名單 * 第一個參數計數初始值肯定是0,第2個參數,accumulator的name */ accumulator = jsc.sparkContext().accumulator(0, "OnlineBlacklistCounter"); JavaReceiverInputDStream lines = jsc.socketTextStream("master1", 9999); JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce) @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //過濾黑名單 wordsCount.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void> (){ @Override public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception { //對數據rdd進行過濾 rdd.filter(new Function<Tuple2<String, Integer>,Boolean>(){ @Override public Boolean call(Tuple2<String, Integer> wordPair) throws Exception { //判斷現(xiàn)在循環(huán)的每個key,是否是在黑名單中 if (broadcastList.value().contains(wordPair._1)){ accumulator.add(wordPair._2);//這里添加過濾掉的黑名單的個數,用于全局通知 return false;//包含,return false,過濾掉 } else { return true;//不包含,return true,不過濾 } } }).collect();//action觸發(fā)下 //連接上nc -lk 9999,輸入Hadoop,Spark,Hive,Scala,就會輸出2次,是累加的。代表總共過濾了2次黑名單 System.out.println(" BlackList appeared : " + accumulator.value() + " times"); return null; } }); /* * Spark Streaming執(zhí)行引擎也就是Driver開始運行,Driver啟動的時候是位于一條新的線程中的,當然其內部有消息循環(huán)體,用于 * 接受應用程序本身或者Executor中的消息; */ jsc.start(); jsc.awaitTermination(); jsc.close(); }}