flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 王涛@深瞳云 <taow...@deepglint.com>
Subject Re: 如何每五分钟统计一次当天某个消息的总条数
Date Wed, 06 Mar 2019 03:52:06 GMT
你好,如果是这样的需求:“按一天统计某一个key上有多少条数据,统计结果每五分钟输出更新一次”的话,
我认为可以这样:
 在一个一天的windows中做Tupel2数据的reduce,然后在下游接一个五分钟的ProcessTimeWindow,在这个五分钟的windwos中做evictor(CountEvictor.of(1)),然后输出。
 比如这样:
streamOperator
  .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<EventItem>() {
  @Override
  public long extractAscendingTimestamp(EventItem eventItem) {
   return eventItem.getWindowEnd();
  }
  })
  .map(eventItem -> Tuple2.of(eventItem.getItemId(), 1L))
  .keyBy(1)
   // 东八区零点到23:59:59:999的滑动事件时间窗口
  .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))
  // 在window中key上的消息条数
  .reduce((x1,x2)->new Tuple2<>(x2._1(),x1._1()+x2._2()))

  // 在5分钟的ProcessTime滑动窗口里,只取最后一条输出
  .keyBy(1)
  .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
  .evictor(CountEvictor.of(1))
  .reduce((ReduceFunction) (value1, value2) -> value2)

  .addSink(textLongSink);



这是我在使用过程中实时刷新每天统计数据的方法。

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message