flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ever" <439674...@qq.com>
Subject flink 1.8.1 时间窗口无法关闭以及消息丢失的问题
Date Fri, 12 Jul 2019 08:05:16 GMT
有一个基于事件时间的流处理程序,每10秒统计一次过去一分钟的数据。
数据每隔10秒会过来一批。
代码如下图:
```
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)


env.setParallelism(parallel)


env.addSource(source)
      .map(json => {
          new InvokingInfoWrapper(xxx)
        })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5))
{
        override def extractTimestamp(invoking: InvokingInfoWrapper): Long = {
          invoking.timestamp
        }
      })
      .keyBy(invokingInfo => {
        s"${invokingInfo.caller}_${invokingInfo.callee}"
      })
      .timeWindow(Time.seconds(60), Time.seconds(10))
      .reduce(innerReducer).map(invokingInfo => { // ##2map
      //some mapping code
      invokingInfo
      })
      .addSink(new WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")

```




由于是在预发布环境上线, 流量不大,我观察到一个现场如下:
1. 第一条数据的时间戳为03:15:48
2. 第二条数据的时间戳为03:15:59, 触发reduce操作(5次,说明有5个滑动窗口)
3. 第三条数据的时间戳为03:16:06,   触发reduce操作(同样5次)
4. 第四条数据的时间戳为03:17:55,   这时候应该触发前三条数据所在的窗口的关闭(5个滑动窗口起码要关几个),进入到上述##2map这个步骤,
然而并没有。
5. 第五条数据的时间戳为03:18:01,  这时候触发了跟第四条数据的reduce操作。


感觉前三条数据给吞了。


为什么呢?
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message