flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From USERNAME <oracle...@126.com>
Subject Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别
Date Wed, 08 Jan 2020 07:27:16 GMT
感谢 唐老师 解答!

在 2020-01-07 19:46:06,"Yun Tang" <myasuka@live.com> 写道:
>Hi
>
>使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是
clear掉相应的state [1]
>至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on
heap直接存储的对象[2],存在修改的副作用。
>而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据
[3],也就不存在修改的副作用了。
>
>
>[1] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
>[2] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57
>[3] https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119
>
>祝好
>唐云
>
>________________________________
>From: USERNAME <oracle_cj@126.com>
>Sent: Tuesday, January 7, 2020 17:54
>To: user-zh@flink.apache.org <user-zh@flink.apache.org>
>Subject: FLINK 不同 StateBackend ProcessWindowFunction的差别
>
>各位好!
>祝大家新年快乐!
>
>
>
>
>--版本
>FLINK 1.9.1 ON YARN
>
>
>--过程
>1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
>2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
>3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
>--问题
>new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
>使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
>使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
>这种计算场景有更好的计算方法吗?
>
>
>--部分代码
>final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
>new ProcessWindowFunction{
>public void process(Tuple tuple, Context context, Iterable<StringBean> elements,
Collector<String> out) throws Exception {
>for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) {
>....
>iter.remove();
>}
>}
>....
>}
>
>
>
>
>
>
>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message