flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 邵志鹏 <bobr...@163.com>
Subject Re:会话窗口关闭时间的问题
Date Mon, 29 Apr 2019 10:58:40 GMT
您好,下面是个人理解:


首先,这个问题不是Session窗口的问题,滚动和滑动是一样的。


时间窗口的计算输出是由时间特性确定的,目前
1. 只有processing-time即处理时间(没有水位线处理乱序)能够满足及时输出窗口的结果。
2. 把eventtime的水位线时间戳设置为System.currentTimeMillis()也是可以及时输出窗口的结果,但是只会消费Flink程序启动后接收到的新消息,之前的消息是处理不到的,即便是新的消费者组group.id和earliest也无效【意思就是容错和重播失效,当然还可以再反复验证】。


目前EventTime-事件时间做到实时正确性的前提:数据的事件时间间隔小,或者小于窗口时间间隔就可以了,保证数据流不中断,这样就把不及时输出窗口的时间点无限推到无穷大的未来,即程序最终崩溃或者下线那一刻。


水位线是用来处理事件乱序的,水位线的增长依赖数据的输入,这个是很明显的咯,assignTimestampsAndWatermarks的时候根据事件时间推算的嘛,而且还会减掉一点时间,就是多掳一点数据,所以数据中断了,就是水位线停止增长了。


然后再来看,事件时间窗口默认使用的窗口触发源码:
onElement和onEventTime时才有机会TriggerResult.FIRE;
onElement时会判断水位线。


onEventTime时会根据水位线设置的时间戳定时器进行时间比较。
onEventTime往上找会找到InternalTimerServiceImpl#advanceWatermark 再往上找会到AbstractStreamOperator#processWatermark,
也就是和新的数据进来有关。


结论就是,如果当前事件时间窗口的end时间还没到,然而水位线是小于这个end时间的,如果处理乱序的间隔比较大,甚至会有多个窗口的end时间都大于最近的水位线时间戳,那不就是把窗口往后退了嘛...只有更后面的数据到来,新的水位线增长上去,前面滞留的窗口数据才有机会输出。


所以我的想法是,在每一个时间窗口上面加上一个判断,只要当前窗口未关闭未触发,窗口的end时间大于或等于自然时间点就触发【保证只触发一次就好】,不需要等到下一次水位线增长。


另外,目前的事件时间是符合自然的实时流数据语义的,可是,业务数据有时候间隔还是蛮大的,毕竟有一些阶段数据比较密集,有一些阶段数据比较稀疏。


以上为个人理解,也遇到同样的问题,甚至认为事件时间在Flink这里毫无意义,如有哪里不对的地方,做梦都想肯定是哪里不对,欢迎讨论,如果真的不对,希望能给出正确的demo,这样就可以完美的用于生产了。


还有就是我默认为,窗口是根据事件已经确定好了的:
时间窗口的生成:


模板方法-处理水位线:AbstractStreamOperator#processWatermark


InternalTimerServiceImpl#advanceWatermark


默认的事件时间触发器:





在 2019-04-29 18:06:30,by1507118@buaa.edu.cn 写道:


各位大神,你们好:

       最近有一个问题一直困扰着我:我设置的会话窗口,会在非活动状态10s后结束窗口,发现它会在下次窗口生成时才发送本窗口处理完的数据,而我想在本次窗口结束时发送这个数据,应该如何处理?万分感激

 

// 这里配置了kafka的信息,并进行数据流的输入

 

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 

      FlinkKafkaConsumer010<RfidRawData> kafkaSource = new FlinkKafkaConsumer010<>("rfid-input-topic",

            new RfidRawDataSchema(), props);

      kafkaSource.assignTimestampsAndWatermarks(new CarTimestamp());

 

      DataStream<RfidRawData> dataStream = env.addSource(kafkaSource);

 

      // 会话窗口:如果用户处于非活动状态长达10s,则认为会话结束。Reduce中写的是窗口融合的方法

      DataStream<RfidRawData> outputStream = dataStream.keyBy("uniqueId")

      .window(EventTimeSessionWindows.withGap(Time.seconds(10))).reduce(new

      RfidReduceFunction());

  

      //通过kafka数据流的输出

outputStream.addSink(new FlinkKafkaProducer010<>("rfid-output-topic", new RfidRawDataSchema(),
props));

 

      try {

         env.execute("Flink add data source");

      } catch (Exception e) {

         // TODO Auto-generated catch block

         e.printStackTrace();

      }
Mime
  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message