flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Navneeth Krishnan <reachnavnee...@gmail.com>
Subject State Issue
Date Sat, 09 Sep 2017 00:32:16 GMT
Hi,

I'm experiencing a wired issue where any data put into map state when
retrieved with the same key is returning as null and hence it puts the same
value again and again. I used rocksdb state backend but tried with Memory
state backend too but the issue still exist.

Each time when I set the key and value into MapState it creates a new map I
couldn't access the previous value. But when I iterate over the MapState
keys and values, I can see the same key added multiple times.

Each put operation goes through the code lines marked in red.

*NestedMapsStateTable.java*

S get(K key, int keyGroupIndex, N namespace) {

   checkKeyNamespacePreconditions(key, namespace);

   Map<N, Map<K, S>> namespaceMap = getMapForKeyGroup(keyGroupIndex);



* if (namespaceMap == null) {      return null;   }*

   Map<K, S> keyedMap = namespaceMap.get(namespace);

   if (keyedMap == null) {
      return null;
   }

   return keyedMap.get(key);
}


*HeapMapState.java*

@Override
public void put(UK userKey, UV userValue) {

   HashMap<UK, UV> userMap = stateTable.get(currentNamespace);



* if (userMap == null) {      userMap = new HashMap<>();
stateTable.put(currentNamespace, userMap);   }*

   userMap.put(userKey, userValue);
}


*My Code:*

*open()*

MapStateDescriptor<String, String> testStateDescriptor = new
MapStateDescriptor<>("test-state",
        TypeInformation.of(new TypeHint<String>() {}),
TypeInformation.of(new TypeHint<String>() {}));

testState = getRuntimeContext().getMapState(testStateDescriptor);


*flatMap:*

if(testState.contains(user)){
    *// DO Something*
} else {
    testState.put(user, userInfo);
}


streamEnv.setStateBackend(new MemoryStateBackend());

streamEnv.setParallelism(1);


Thanks

Mime
View raw message