免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
第103講: 動手實戰(zhàn)聯(lián)合使用Spark Streaming、Broadcast、Accumulator實現(xiàn)在線黑名單過濾和計數

有興趣想學習國內整套Spark+Spark Streaming+Machine learning頂級課程的,可加我qq  471186150。共享視頻,性價比超高!

1:廣播可以自定義,例如你自定義廣播里面的內容,就有很多你可以自定義的操作。尤其是結合了Broadcast和Accumulator的時候,他可以實現(xiàn)一些非常復雜的功能。

2:廣播和計數器在企業(yè)的實際開發(fā)中,非常重要,主要是可以自定義,自定義的時候可以實現(xiàn)非常復雜的邏輯。計數器Accumulator可以計數黑名單。黑名單數據可以寫在廣播里面

3:下面直接上代碼,當然,這只是初步的使用,廣播和計算器的自定義,絕對是高端的spark技術。它們倆者結合自定義會發(fā)揮非常強大的作用。很多一線互聯(lián)網公司,它們很多復雜的業(yè)務,都需要聯(lián)合使用和自定義廣播和計數器。

package com.dt.streaming;import java.io.BufferedReader;import java.io.InputStreamReader;import java.net.ConnectException;import java.net.Socket;import java.util.Arrays;import java.util.List;import org.apache.spark.Accumulator;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.broadcast.Broadcast;import org.apache.spark.storage.StorageLevel;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.Time;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.receiver.Receiver;import scala.Tuple2;public class SparkStreamingBroadcastAccumulator {   /*   *第103講    */   //這個是基于原子型的變量,保存黑名單   private static volatile Broadcast<List<String>> broadcastList = null;   private static volatile Accumulator<Integer> accumulator = null;   public static void main(String[] args) {      SparkConf conf = new SparkConf().setMaster("local[2]").            setAppName("WordCountOnline");      JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(15));            /**       * 實例化廣播,使用Broadcast廣播黑名單到每個Executor中,廣播是基于SparkContext的。而不是StreamingContext。       * 沒有action,廣播是不能發(fā)出的       */      broadcastList = jsc.sparkContext().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));            /**       * 全局計數器,用于統(tǒng)計在線過濾了多少個黑名單       * 第一個參數計數初始值肯定是0,第2個參數,accumulator的name       */      accumulator = jsc.sparkContext().accumulator(0, "OnlineBlacklistCounter");                  JavaReceiverInputDStream lines = jsc.socketTextStream("master1", 9999);               JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {         @Override         public Tuple2<String, Integer> call(String word) throws Exception {            return new Tuple2<String, Integer>(word, 1);         }      });                  JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { //對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce)                  @Override         public Integer call(Integer v1, Integer v2) throws Exception {            return v1 + v2;         }      });      //過濾黑名單      wordsCount.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void> (){         @Override         public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {            //對數據rdd進行過濾            rdd.filter(new Function<Tuple2<String, Integer>,Boolean>(){               @Override               public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {                  //判斷現(xiàn)在循環(huán)的每個key,是否是在黑名單中                  if (broadcastList.value().contains(wordPair._1)){                     accumulator.add(wordPair._2);//這里添加過濾掉的黑名單的個數,用于全局通知                     return false;//包含,return  false,過濾掉                  } else {                     return true;//不包含,return true,不過濾                  }                                                   }                           }).collect();//action觸發(fā)下            //連接上nc -lk 9999,輸入Hadoop,Spark,Hive,Scala,就會輸出2次,是累加的。代表總共過濾了2次黑名單            System.out.println(" BlackList appeared : " + accumulator.value() + " times");            return null;         }               });                  /*       * Spark Streaming執(zhí)行引擎也就是Driver開始運行,Driver啟動的時候是位于一條新的線程中的,當然其內部有消息循環(huán)體,用于       * 接受應用程序本身或者Executor中的消息;       */      jsc.start();            jsc.awaitTermination();      jsc.close();   }}

本站僅提供存儲服務,所有內容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權內容,請點擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
Spark版本定制班第1課-Frank
070 DStream中的transform和foreachRDD函數
運行第一個SparkStreaming程序
用spark streaming實時讀取hdfs數據并寫入elasticsearch中
利用Flink stream從kafka中寫數據到mysql
Java 億萬級數據導出到Excel
更多類似文章 >>
生活服務
分享 收藏 導長圖 關注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服