flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "忝忝向仧" <153488...@qq.com>
Subject Flink双流Join问题
Date Sat, 04 Apr 2020 09:56:43 GMT
各位好:&nbsp; &nbsp; &nbsp; &nbsp; Flink双流Join遇到一个问题,能否解释下,谢谢.
&nbsp; &nbsp; &nbsp; &nbsp; ds1和ds2分别读取kafka两个流数据,使用event
time和watermark特性,3s的一个翻滚窗口,定义如下:
&nbsp; &nbsp; &nbsp; &nbsp; 最后,join输出的时候,为什么触发窗口的数据第二条就触发了?
&nbsp; &nbsp; &nbsp; &nbsp; 按照水印的触发条件应该是watermark_time&gt;=window_endtime.那么,这里应该是1000000057000这条数据来了后才会触发,但是结果却是56000就触发了.为什么?



定义的代码如下:
DataStream<String&gt; stream1 = env
        .addSource(new FlinkKafkaConsumer09<String&gt;("stream1", new SimpleStringSchema(),
properties).setStartFromLatest())
        .assignTimestampsAndWatermarks(
                new AssignerWithPeriodicWatermarks<String&gt;() {
                    long currentTimeStamp = 0L;
                    long maxDelayAllowed = 0L;
                    long currentWaterMark;
                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        currentWaterMark = currentTimeStamp-maxDelayAllowed;
                        return new Watermark(currentWaterMark);
                    }
                    @Override
                    public long extractTimestamp(String s, long l) {
                        String[] arr= s.split(" ");
                        long timeStamp = Long.parseLong(arr[2]);
                        currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
                        System.out.println("currentTimeStamp: " +  currentTimeStamp +",Key:"
+ arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark);
                        return timeStamp;
                    }
                }
        );

DataStream<Tuple3<String,String,String&gt;&gt; ds1 = stream1.map(new MapFunction<String,
Tuple3<String,String,String&gt;&gt;() {
    @Override
    public Tuple3<String, String,String&gt; map(String s1) throws Exception {
        String[] arr = s1.split(" ");
        return Tuple3.of(arr[0],arr[1],arr[2]);
    }
});

ds1.print();
DataStream<String&gt; stream2 = env
        .addSource(new FlinkKafkaConsumer09<String&gt;("stream2", new SimpleStringSchema(),
properties).setStartFromLatest())
        .assignTimestampsAndWatermarks(
                new AssignerWithPeriodicWatermarks<String&gt;() {
                    long currentTimeStamp = 0L;
                    long maxDelayAllowed = 0L;
                    long currentWaterMark;
                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        currentWaterMark = currentTimeStamp-maxDelayAllowed;
                        return new Watermark(currentWaterMark);
                    }
                    @Override
                    public long extractTimestamp(String s, long l) {
                        String[] arr= s.split(" ");
                        long timeStamp = Long.parseLong(arr[2]);
                        currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
                        System.out.println("currentTimeStamp: " +  currentTimeStamp +",Key:"
+ arr[0] + ",EventTime:" + timeStamp + ",前一条数据的水位线:" + currentWaterMark);
                        return timeStamp;
                    }
                }
        );

DataStream<Tuple3<String,String,String&gt;&gt; ds2 =stream2.map(new MapFunction<String,
Tuple3<String,String,String&gt;&gt;() {
    @Override
    public Tuple3<String,String,String&gt; map(String s2) throws Exception {
        String [] arr = s2.split(" ");
        return Tuple3.of(arr[0],arr[1],arr[2]);
    }
});
ds2.print();
ds1.join(ds2).where(new KeySelector<Tuple3<String, String,String&gt;,String&gt;()
{
    @Override
    public String getKey(Tuple3<String, String,String&gt; value) throws Exception {
        return value.f0;
    }
}).equalTo(new KeySelector<Tuple3<String, String ,String&gt;, String&gt;() {
    @Override
    public String getKey(Tuple3<String, String,String&gt; value) throws Exception {
        return value.f0;
    }
})
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply(new JoinFunction<Tuple3<String, String,String&gt;, Tuple3<String, String,String&gt;,
String&gt;() {
    @Override
    public String join(Tuple3<String, String,String&gt; value1, Tuple3<String, String,String&gt;
value2) throws Exception {
        return value1.f1 + "=" + value2.f1;
    }
}).print();
结果如下:
currentTimeStamp: 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
4&gt; (1,tom1,1000000055000)
currentTimeStamp: 1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
4&gt; (1,tom2,1000000056000)
currentTimeStamp: 1000000055000,Key:1,EventTime:1000000055000,前一条数据的水位线:0
3&gt; (1,jerry1,1000000055000)
currentTimeStamp: 1000000056000,Key:1,EventTime:1000000056000,前一条数据的水位线:1000000055000
3&gt; (1,jerry2,1000000056000)
2&gt; tom1=jerry1
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message