flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Navneeth Krishnan <reachnavnee...@gmail.com>
Subject Re: State Issue
Date Sun, 10 Sep 2017 06:57:58 GMT
Sorry my bad, figured out it was a change done at our end which created
different keys. Thanks.

On Fri, Sep 8, 2017 at 5:32 PM, Navneeth Krishnan <reachnavneeth2@gmail.com>
wrote:

> Hi,
>
> I'm experiencing a wired issue where any data put into map state when
> retrieved with the same key is returning as null and hence it puts the same
> value again and again. I used rocksdb state backend but tried with Memory
> state backend too but the issue still exist.
>
> Each time when I set the key and value into MapState it creates a new map
> I couldn't access the previous value. But when I iterate over the MapState
> keys and values, I can see the same key added multiple times.
>
> Each put operation goes through the code lines marked in red.
>
> *NestedMapsStateTable.java*
>
> S get(K key, int keyGroupIndex, N namespace) {
>
>    checkKeyNamespacePreconditions(key, namespace);
>
>    Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);
>
>
>
> * if (namespaceMap == null) {      return null;   }*
>
>    Map<K, S> keyedMap = namespaceMap.get(namespace);
>
>    if (keyedMap == null) {
>       return null;
>    }
>
>    return keyedMap.get(key);
> }
>
>
> *HeapMapState.java*
>
> @Override
> public void put(UK userKey, UV userValue) {
>
>    HashMap<UK, UV> userMap = stateTable.get(currentNamespace);
>
>
>
> * if (userMap == null) {      userMap = new HashMap<>();      stateTable.put(currentNamespace,
userMap);   }*
>
>    userMap.put(userKey, userValue);
> }
>
>
> *My Code:*
>
> *open()*
>
> MapStateDescriptor<String, String> testStateDescriptor = new MapStateDescriptor<>("test-state",
>         TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<String>()
{}));
>
> testState = getRuntimeContext().getMapState(testStateDescriptor);
>
>
> *flatMap:*
>
> if(testState.contains(user)){
>     *// DO Something*
> } else {
>     testState.put(user, userInfo);
> }
>
>
> streamEnv.setStateBackend(new MemoryStateBackend());
>
> streamEnv.setParallelism(1);
>
>
> Thanks
>
>

Mime
View raw message