flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Congxian Qiu <qcx978132...@gmail.com>
Subject Re: Protection against huge values in RocksDB List State
Date Sat, 16 May 2020 07:04:59 GMT

As you described, I'm not sure whether MapState can help you in such case.
MapState will serializer each <mapKey, mapvalue> separately, so it would
not encounter such the problem as ListState.

When using MapState, you may need to handle how to set the mapKey, if the
whole state will be cleared after processed, then you can use a monotonous
increment integer as the mapKey, store the upper used mapKey in a value


Yun Tang <myasuka@live.com> 于2020年5月15日周五 下午10:31写道:

> Hi Robin
> I think you could record the size of you list under currentKey with
> another value state or operator state (store a Map with <key-by key, list
> length>, store the whole map in list when snapshotting). If you do not have
> many key-by keys, operator state is a good choice as that is on-heap and
> lightweight.
> Best
> Yun Tang
> ------------------------------
> *From:* Robin Cassan <robin.cassan@contentsquare.com>
> *Sent:* Friday, May 15, 2020 20:59
> *To:* Yun Tang <myasuka@live.com>
> *Cc:* user <user@flink.apache.org>
> *Subject:* Re: Protection against huge values in RocksDB List State
> Hi Yun, thanks for your answer! And sorry I didn't see this limitation
> from the documentation, makes sense!
> In our case, we are merging too many elements (since each element is
> limited to 4Mib in our kafka topic). I agree we do not want our state to
> contain really big values, this is why we are trying to find a way to put a
> limit on the number (or total size) of elements that are aggregated in the
> state of the window.
> We have found a way to do this by using another sessionWindow that is set
> before the other one, which will store the number of messages for each key
> and reject new messages if we have reached a limit, but we are wondering if
> there is a better way to achieve that without creating another state.
> Thanks again,
> Robin
> Le jeu. 14 mai 2020 à 19:38, Yun Tang <myasuka@live.com> a écrit :
> Hi Robin
> First of all, the root cause is not RocksDB cannot store large list state
> when you merge but the JNI limitation of 2^31 bytes [1].
> Moreover, RocksDB java would not return anything when you call merge [2]
> operator.
> Did you merge too many elements or just merge too big-size elements? Last
> but not least, even you could merge large list, I think getting a value
> with size larger than 2^31 bytes should not behave well.
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
> [2]
> https://github.com/facebook/rocksdb/blob/50d63a2af01a46dd938dc1b717067339c92da040/java/src/main/java/org/rocksdb/RocksDB.java#L1382
> Best
> Yun Tang
> ------------------------------
> *From:* Robin Cassan <robin.cassan@contentsquare.com>
> *Sent:* Friday, May 15, 2020 0:37
> *To:* user <user@flink.apache.org>
> *Subject:* Protection against huge values in RocksDB List State
> Hi all!
> I cannot seem to find any setting to limit the number of records appended
> in a RocksDBListState that is used when we use SessionWindows with a
> ProcessFunction.
> It seems that, for each incoming element, the new element will be appended
> to the value with the RocksDB `merge` operator, without any safeguard to
> make sure that it doesn't grow infinitely. RocksDB merge seems to support
> returning false in case of error, so I guess we could implement a limit by
> returning false in the merge operator, but since Flink seems to use the
> "stringappendtest" merge operator (
> https://github.com/facebook/rocksdb/blob/fdf882ded218344c136c97daf76dfb59e4bc155f/utilities/merge_operators/string_append/stringappend2.cc
> we always return true no matter what.
> This is troublesome for us because it would make a lot of sense to specify
> an acceptable limit to how many elements can be aggregated under a given
> key, and because when we happen to have too many elements we get an
> exception from RocksDB:
> ```
> Caused by: org.apache.flink.util.FlinkRuntimeException: Error while
> retrieving data from RocksDB
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
> at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
> ... 7 more
> Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM
> limit
> at org.rocksdb.RocksDB.get(Native Method)
> at org.rocksdb.RocksDB.get(RocksDB.java:810)
> at
> org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118)
> ... 12 more
> ```
> We are currently bypassing this by using a Reduce operator instead, which
> ensures that we only store one element per key, but this gives us degraded
> performance.
> Thanks for your input!
> Robin

View raw message