flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From USERNAME <oracle...@126.com>
Subject Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别
Date Wed, 08 Jan 2020 06:17:11 GMT
我这例子需要通过 在触发器中 TriggerResult.FIRE_AND_PURGE 来清理当前计算窗口的数据,实现增量计算,跟TTL有点区别吧。





在 2020-01-07 19:51:57,"huoguo" <greemqqran@163.com> 写道:
>
>
>过期数据能通过TTL 设置过期吗?
>
>> 在 2020年1月7日,17:54,USERNAME <oracle_cj@126.com> 写道:
>> 
>> 各位好!
>> 祝大家新年快乐!
>> 
>> 
>> 
>> 
>> --版本
>> FLINK 1.9.1 ON YARN
>> 
>> 
>> --过程
>> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
>> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
>> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
>> --问题
>> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
>> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
>> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
>> 这种计算场景有更好的计算方法吗?
>> 
>> 
>> --部分代码
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> 
>> 
>> new ProcessWindowFunction{
>> public void process(Tuple tuple, Context context, Iterable<StringBean> elements,
Collector<String> out) throws Exception {
>> for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) {
>> ....
>> iter.remove();
>> }
>> }
>> ....
>> }
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message