免费视频淫片aa毛片_日韩高清在线亚洲专区vr_日韩大片免费观看视频播放_亚洲欧美国产精品完整版

打開APP
userphoto
未登錄

開通VIP,暢享免費電子書等14項超值服

開通VIP
flink實戰(zhàn)——雙流join之Join和coGroup的區(qū)別和應(yīng)用

本文鏈接:https://blog.csdn.net/aA518189/article/details/84032660

簡介

Join和coGroup都是flinkSQL中用于連接多個流的算子,但是有一定的區(qū)別,推薦能使用coGroup不要使用Join,因為coGroup更強大。下面讓我們簡單看一下兩個算子的用法

Window Join

DataStream,DataStream→DataStream

在給定密鑰和公共窗口上連接兩個數(shù)據(jù)流。

dataStream.join(otherStream)

    .where(<key selector>).equalTo(<key selector>)

    .window(TumblingEventTimeWindows.of(Time.seconds(3)))

    .apply { ... }

Window CoGroup

DataStream,DataStream→DataStream

在給定密鑰和公共窗口上對兩個數(shù)據(jù)流進行Cogroup。

dataStream.coGroup(otherStream)

    .where(0).equalTo(1)

    .window(TumblingEventTimeWindows.of(Time.seconds(3)))

    .apply {}

 CoGrouped源碼

joinedStream源碼

對比結(jié)論

結(jié)論一:.使用時,where里面要傳入數(shù)據(jù)流T1要與數(shù)據(jù)流T2匹配的key,equalTo中傳入T2要和T1匹配的key。

結(jié)論二:join和coGroup的最大區(qū)別就是apply方法提供的參數(shù)類型不一樣,

join的apply

coGroup的apply參數(shù)

兩種算子apply方法中的參數(shù)類型不一樣,join中提供的apply方法,參數(shù)是T1與T2泛型類型。而coGroup中提供的apply方法,參數(shù)是Iterator[T1]與Iterator[2]迭代器,基于這2種方式,如果我們的邏輯不僅僅是對一條record做處理,而是要與上一record或更復(fù)雜的判斷與比較,甚至是對結(jié)果排序,那么join中的apply方法顯得比較困難,所以推薦使用coGroup。 

結(jié)論三:apply方法的好處

我們想要在Flink中實現(xiàn)實時的流計算,就可以通過joinedStream或coGroupedStream來實現(xiàn)。但是在join之后實施更復(fù)雜的運算,例如判斷、迭代等,僅僅通過SQL實現(xiàn),恐怕會很麻煩,只能通過PL/SQL塊來實現(xiàn),而Flink提供了apply方法,用戶可以自己編寫復(fù)雜的函數(shù)來實現(xiàn)。

案例 雙流jion

scala版

package flinkSQL

import java.text.SimpleDateFormat

import org.apache.flink.api.common.functions.CoGroupFunction

import org.apache.flink.streaming.api.TimeCharacteristic

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.util.Collector

import org.apache.flink.api.scala._

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows

import org.apache.flink.streaming.api.windowing.time.Time

import org.apache.flink.table.api.scala._

/**

  * Created by  ${WangZhiHua} on 2018/11/13

  */

object JoinDemo {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //獲取接口傳送的數(shù)據(jù)

    val dataStream1 = env.readTextFile("C:/flink_data/scoket1.txt")

    val dataStream2 = env.readTextFile("C:/flink_data/scoket2.txt")

    val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

   //使用樣例類StockTransaction封裝獲取的數(shù)據(jù)

    val dataStreamMap1 = dataStream1.map(f => {

      val tokens1 = f.split(",")

      StockTransaction(tokens1(0), tokens1(1), tokens1(2).toDouble)

    })

      .assignAscendingTimestamps(f => format.parse(f.tx_time).getTime)

    //使用樣例類StockSnapshot封裝獲取的數(shù)據(jù)

    val dataStreamMap2 = dataStream2.map(f => {

      val tokens2 = f.split(",")

      StockSnapshot(tokens2(0), tokens2(1), tokens2(2).toDouble)

    })

      .assignAscendingTimestamps(f => format.parse(f.md_time).getTime)

    /**

      * 進行雙流join

      * 限定范圍是:3秒鐘的Event time時間窗口

      */

    val joinStream = dataStreamMap1.coGroup(dataStreamMap2)

      .where(_.tx_code)

      .equalTo(_.md_code)

      .window(TumblingEventTimeWindows.of(Time.seconds(3)))

      val innerJoinStream = joinStream.apply(new InnerJoinFunction)

     innerJoinStream.name("innerJoin").print()

    print("===================== end =========================")

    env.execute("join demo")

  }

}

//定義樣例類封裝接收的數(shù)據(jù)

case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double)

case class StockSnapshot(md_time:String, md_code:String,md_value:Double)

//定義一個內(nèi)連接函數(shù),繼承CoCroup

class InnerJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)]{

  override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double,String)]): Unit = {

    /**

      * 將Java中的Iterable對象轉(zhuǎn)換為Scala的Iterable

      * scala的集合操作效率高,簡潔

      */

    import scala.collection.JavaConverters._

    val scalaT1 = T1.asScala.toList

    val scalaT2 = T2.asScala.toList

    /**

      * Inner Join要比較的是同一個key下,同一個時間窗口內(nèi)的數(shù)據(jù)

      */

    if(scalaT1.nonEmpty && scalaT2.nonEmpty){

      for(transaction <- scalaT1){

        for(snapshot <- scalaT2){

          out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test")

        }

      }

    }

  }

}

class LeftJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {

  override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {

    /**

      * 將Java中的Iterable對象轉(zhuǎn)換為Scala的Iterable

      * scala的集合操作效率高,簡潔

      */

    import scala.collection.JavaConverters._

    val scalaT1 = T1.asScala.toList

    val scalaT2 = T2.asScala.toList

    /**

      * Left Join要比較的是同一個key下,同一個時間窗口內(nèi)的數(shù)據(jù)

      */

    if(scalaT1.nonEmpty && scalaT2.isEmpty){

      for(transaction <- scalaT1){

        out.collect(transaction.tx_code,transaction.tx_time, "",transaction.tx_value,0,"Left Join Test")

      }

    }

  }

}

class RightJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {

  override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {

    /**

      * 將Java中的Iterable對象轉(zhuǎn)換為Scala的Iterable

      * scala的集合操作效率高,簡潔

      */

    import scala.collection.JavaConverters._

    val scalaT1 = T1.asScala.toList

    val scalaT2 = T2.asScala.toList

    /**

      * Right Join要比較的是同一個key下,同一個時間窗口內(nèi)的數(shù)據(jù)

      */

    if(scalaT1.isEmpty && scalaT2.nonEmpty){

      for(snapshot <- scalaT2){

        out.collect(snapshot.md_code, "",snapshot.md_time,0,snapshot.md_value,"Right Join Test")

      }

    }

  }

}

java版

public class DoubleJoin {

    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source1 = env.readTextFile("/Users/apple/Downloads/1.txt");

        DataStreamSource<String> source2 = env.readTextFile("/Users/apple/Downloads/2.txt");

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        /**

         * 數(shù)據(jù)流1

         */

        SingleOutputStreamOperator<Row> stream1 = source1.map(new MapFunction<String, Row>() {

            @Override

            public Row map(String value) throws Exception {

                String[] split = value.split(",");

                String timeStamp = split[0];

                String name = split[1];

                String city = split[2];

                Row row = new Row(3);

                row.setField(0,timeStamp);

                row.setField(1,name);

                row.setField(2,city);

                return row;

            }

        }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Row>() {

             long  currentMaxTimestamp = 0L;

             long  maxOutOfOrderness = 10000L;

             Watermark watermark=null;

            //最大允許的亂序時間是10s

             @Nullable

             @Override

             public Watermark getCurrentWatermark() {

                watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);

                 return watermark;

             }

             @Override

             public long extractTimestamp(Row element, long previousElementTimestamp) {

                 long timeStamp = 0;

                 try {

                     timeStamp = simpleDateFormat.parse(element.getField(0).toString()).getDate();

                 } catch (ParseException e) {

                     e.printStackTrace();

                 }

                 currentMaxTimestamp = Math.max(timeStamp, currentMaxTimestamp);

                     return timeStamp ;

             }

         }

        );

        stream1.print();

        /**

         * 數(shù)據(jù)流2

         */

        SingleOutputStreamOperator<Row> stream2 = source2.map(new MapFunction<String, Row>() {

            @Override

            public Row map(String value) throws Exception {

                String[] split = value.split(",");

                String timeStamp = split[0];

                String name = split[1];

                String age = split[2];

                String school= split[3];

                Row row = new Row(4);

                row.setField(0,timeStamp);

                row.setField(1,name);

                row.setField(2,age);

                row.setField(3,school);

                return row;

            }

        }).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Row>() {

            long  currentMaxTimestamp = 0L;

            long  maxOutOfOrderness = 10000L;

            Watermark watermark=null;

            //最大允許的亂序時間是10s

            @Nullable

            @Override

            public Watermark getCurrentWatermark() {

                watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);

                return watermark;

            }

            @Override

            public long extractTimestamp(Row element, long previousElementTimestamp) {

                long timeStamp = 0;

                try {

                    timeStamp = simpleDateFormat.parse(element.getField(0).toString()).getDate();

                } catch (ParseException e) {

                    e.printStackTrace();

                }

                currentMaxTimestamp = Math.max(timeStamp, currentMaxTimestamp);

                return timeStamp ;

            }

        });

         stream2.print();

        /**

         * 雙流join

         */

        stream1.coGroup(stream2)

                .where(new KeySelector<Row, String>() {

                    @Override

                    public String getKey(Row value) throws Exception {

                        System.out.println("stream1"+value.toString());

                        return value.getField(1).toString();

                    }

                })

                .equalTo(new KeySelector<Row, String>() {

                    @Override

                    public String getKey(Row value) throws Exception {

                        System.out.println("stream2"+value.toString());

                        return value.getField(1).toString();

                    }

                }).window(TumblingEventTimeWindows.of(Time.seconds(5)))

                .apply(new CoGroupFunction<Row, Row, Row>() {

                    @Override

                    public void coGroup(Iterable<Row> first, Iterable<Row> second, Collector<Row> out) throws Exception {

                        first.forEach(t ->

                                second.forEach(x ->

                                        {

                                            //雙流join  選取需要的字段

                                            Row row = new Row(3);

                                            Object field1 = t.getField(0);

                                            Object field2 = x.getField(1);

                                            Object field3 = x.getField(2);

                                            //使用row封裝數(shù)據(jù)

                                            row.setField(0, field1);

                                            row.setField(1, field2);

                                            row.setField(2, field3);

                                            out.collect(row);

                                        }

                                ));

                        System.out.println("join"+first.toString());

                    }

                }).printToErr();

        try {

            env.execute("ddcddd");

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

注意:我們在測試的時候盡量使用CountTrigge去觸發(fā)窗口執(zhí)行,如果使用默認的EventTimeTrigger,我們還需要設(shè)置具體的時間戳,不然可能測試時出現(xiàn)獲取不到數(shù)據(jù)的假象,其實是窗口一直沒觸發(fā)。

使用案例:十條數(shù)據(jù)就觸發(fā)一次窗口的執(zhí)行

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

.trigger(CountTrigger.of(10))

————————————————

版權(quán)聲明:本文為CSDN博主「阿華田512」的原創(chuàng)文章,遵循 CC 4.0 BY-SA 版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。

原文鏈接:https://blog.csdn.net/aA518189/article/details/84032660

本站僅提供存儲服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊舉報。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
大數(shù)據(jù)開發(fā)-從cogroup的實現(xiàn)來看join是寬依賴還是窄依賴
idea使用maven快速構(gòu)建flink骨架項目
3000門徒內(nèi)部訓(xùn)練絕密視頻(泄密版)第1課:大數(shù)據(jù)最火爆語言Scala光速入門
大數(shù)據(jù)Flink的SQL在API中進行操作
Spark 初體驗
【SCALA】1、我要開始學(xué)習(xí)scala啦
更多類似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號成功
后續(xù)可登錄賬號暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點擊這里聯(lián)系客服!

聯(lián)系客服