本文鏈接:https://blog.csdn.net/aA518189/article/details/84032660
簡介
Join和coGroup都是flinkSQL中用于連接多個流的算子,但是有一定的區(qū)別,推薦能使用coGroup不要使用Join,因為coGroup更強大。下面讓我們簡單看一下兩個算子的用法
Window Join
DataStream,DataStream→DataStream
在給定密鑰和公共窗口上連接兩個數(shù)據(jù)流。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply { ... }
Window CoGroup
DataStream,DataStream→DataStream
在給定密鑰和公共窗口上對兩個數(shù)據(jù)流進行Cogroup。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
CoGrouped源碼
joinedStream源碼
對比結(jié)論
結(jié)論一:.使用時,where里面要傳入數(shù)據(jù)流T1要與數(shù)據(jù)流T2匹配的key,equalTo中傳入T2要和T1匹配的key。
結(jié)論二:join和coGroup的最大區(qū)別就是apply方法提供的參數(shù)類型不一樣,
join的apply
coGroup的apply參數(shù)
兩種算子apply方法中的參數(shù)類型不一樣,join中提供的apply方法,參數(shù)是T1與T2泛型類型。而coGroup中提供的apply方法,參數(shù)是Iterator[T1]與Iterator[2]迭代器,基于這2種方式,如果我們的邏輯不僅僅是對一條record做處理,而是要與上一record或更復(fù)雜的判斷與比較,甚至是對結(jié)果排序,那么join中的apply方法顯得比較困難,所以推薦使用coGroup。
結(jié)論三:apply方法的好處
我們想要在Flink中實現(xiàn)實時的流計算,就可以通過joinedStream或coGroupedStream來實現(xiàn)。但是在join之后實施更復(fù)雜的運算,例如判斷、迭代等,僅僅通過SQL實現(xiàn),恐怕會很麻煩,只能通過PL/SQL塊來實現(xiàn),而Flink提供了apply方法,用戶可以自己編寫復(fù)雜的函數(shù)來實現(xiàn)。
案例 雙流jion
scala版
package flinkSQL
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._
/**
* Created by ${WangZhiHua} on 2018/11/13
*/
object JoinDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//獲取接口傳送的數(shù)據(jù)
val dataStream1 = env.readTextFile("C:/flink_data/scoket1.txt")
val dataStream2 = env.readTextFile("C:/flink_data/scoket2.txt")
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
//使用樣例類StockTransaction封裝獲取的數(shù)據(jù)
val dataStreamMap1 = dataStream1.map(f => {
val tokens1 = f.split(",")
StockTransaction(tokens1(0), tokens1(1), tokens1(2).toDouble)
})
.assignAscendingTimestamps(f => format.parse(f.tx_time).getTime)
//使用樣例類StockSnapshot封裝獲取的數(shù)據(jù)
val dataStreamMap2 = dataStream2.map(f => {
val tokens2 = f.split(",")
StockSnapshot(tokens2(0), tokens2(1), tokens2(2).toDouble)
})
.assignAscendingTimestamps(f => format.parse(f.md_time).getTime)
/**
* 進行雙流join
* 限定范圍是:3秒鐘的Event time時間窗口
*/
val joinStream = dataStreamMap1.coGroup(dataStreamMap2)
.where(_.tx_code)
.equalTo(_.md_code)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
val innerJoinStream = joinStream.apply(new InnerJoinFunction)
innerJoinStream.name("innerJoin").print()
print("===================== end =========================")
env.execute("join demo")
}
}
//定義樣例類封裝接收的數(shù)據(jù)
case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double)
case class StockSnapshot(md_time:String, md_code:String,md_value:Double)
//定義一個內(nèi)連接函數(shù),繼承CoCroup
class InnerJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)]{
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double,String)]): Unit = {
/**
* 將Java中的Iterable對象轉(zhuǎn)換為Scala的Iterable
* scala的集合操作效率高,簡潔
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Inner Join要比較的是同一個key下,同一個時間窗口內(nèi)的數(shù)據(jù)
*/
if(scalaT1.nonEmpty && scalaT2.nonEmpty){
for(transaction <- scalaT1){
for(snapshot <- scalaT2){
out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test")
}
}
}
}
}
class LeftJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
/**
* 將Java中的Iterable對象轉(zhuǎn)換為Scala的Iterable
* scala的集合操作效率高,簡潔
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Left Join要比較的是同一個key下,同一個時間窗口內(nèi)的數(shù)據(jù)
*/
if(scalaT1.nonEmpty && scalaT2.isEmpty){
for(transaction <- scalaT1){
out.collect(transaction.tx_code,transaction.tx_time, "",transaction.tx_value,0,"Left Join Test")
}
}
}
}
class RightJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
/**
* 將Java中的Iterable對象轉(zhuǎn)換為Scala的Iterable
* scala的集合操作效率高,簡潔
*/
import scala.collection.JavaConverters._
val scalaT1 = T1.asScala.toList
val scalaT2 = T2.asScala.toList
/**
* Right Join要比較的是同一個key下,同一個時間窗口內(nèi)的數(shù)據(jù)
*/
if(scalaT1.isEmpty && scalaT2.nonEmpty){
for(snapshot <- scalaT2){
out.collect(snapshot.md_code, "",snapshot.md_time,0,snapshot.md_value,"Right Join Test")
}
}
}
}
java版
public class DoubleJoin {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source1 = env.readTextFile("/Users/apple/Downloads/1.txt");
DataStreamSource<String> source2 = env.readTextFile("/Users/apple/Downloads/2.txt");
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 數(shù)據(jù)流1
*/
SingleOutputStreamOperator<Row> stream1 = source1.map(new MapFunction<String, Row>() {
@Override
public Row map(String value) throws Exception {
String[] split = value.split(",");
String timeStamp = split[0];
String name = split[1];
String city = split[2];
Row row = new Row(3);
row.setField(0,timeStamp);
row.setField(1,name);
row.setField(2,city);
return row;
}
}).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Row>() {
long currentMaxTimestamp = 0L;
long maxOutOfOrderness = 10000L;
Watermark watermark=null;
//最大允許的亂序時間是10s
@Nullable
@Override
public Watermark getCurrentWatermark() {
watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
return watermark;
}
@Override
public long extractTimestamp(Row element, long previousElementTimestamp) {
long timeStamp = 0;
try {
timeStamp = simpleDateFormat.parse(element.getField(0).toString()).getDate();
} catch (ParseException e) {
e.printStackTrace();
}
currentMaxTimestamp = Math.max(timeStamp, currentMaxTimestamp);
return timeStamp ;
}
}
);
stream1.print();
/**
* 數(shù)據(jù)流2
*/
SingleOutputStreamOperator<Row> stream2 = source2.map(new MapFunction<String, Row>() {
@Override
public Row map(String value) throws Exception {
String[] split = value.split(",");
String timeStamp = split[0];
String name = split[1];
String age = split[2];
String school= split[3];
Row row = new Row(4);
row.setField(0,timeStamp);
row.setField(1,name);
row.setField(2,age);
row.setField(3,school);
return row;
}
}).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Row>() {
long currentMaxTimestamp = 0L;
long maxOutOfOrderness = 10000L;
Watermark watermark=null;
//最大允許的亂序時間是10s
@Nullable
@Override
public Watermark getCurrentWatermark() {
watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
return watermark;
}
@Override
public long extractTimestamp(Row element, long previousElementTimestamp) {
long timeStamp = 0;
try {
timeStamp = simpleDateFormat.parse(element.getField(0).toString()).getDate();
} catch (ParseException e) {
e.printStackTrace();
}
currentMaxTimestamp = Math.max(timeStamp, currentMaxTimestamp);
return timeStamp ;
}
});
stream2.print();
/**
* 雙流join
*/
stream1.coGroup(stream2)
.where(new KeySelector<Row, String>() {
@Override
public String getKey(Row value) throws Exception {
System.out.println("stream1"+value.toString());
return value.getField(1).toString();
}
})
.equalTo(new KeySelector<Row, String>() {
@Override
public String getKey(Row value) throws Exception {
System.out.println("stream2"+value.toString());
return value.getField(1).toString();
}
}).window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new CoGroupFunction<Row, Row, Row>() {
@Override
public void coGroup(Iterable<Row> first, Iterable<Row> second, Collector<Row> out) throws Exception {
first.forEach(t ->
second.forEach(x ->
{
//雙流join 選取需要的字段
Row row = new Row(3);
Object field1 = t.getField(0);
Object field2 = x.getField(1);
Object field3 = x.getField(2);
//使用row封裝數(shù)據(jù)
row.setField(0, field1);
row.setField(1, field2);
row.setField(2, field3);
out.collect(row);
}
));
System.out.println("join"+first.toString());
}
}).printToErr();
try {
env.execute("ddcddd");
} catch (Exception e) {
e.printStackTrace();
}
}
}
注意:我們在測試的時候盡量使用CountTrigge去觸發(fā)窗口執(zhí)行,如果使用默認的EventTimeTrigger,我們還需要設(shè)置具體的時間戳,不然可能測試時出現(xiàn)獲取不到數(shù)據(jù)的假象,其實是窗口一直沒觸發(fā)。
使用案例:十條數(shù)據(jù)就觸發(fā)一次窗口的執(zhí)行
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.trigger(CountTrigger.of(10))
————————————————
版權(quán)聲明:本文為CSDN博主「阿華田512」的原創(chuàng)文章,遵循 CC 4.0 BY-SA 版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/aA518189/article/details/84032660