flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [6/7] flink git commit: [FLINK-7700] Fix RocksDB AggregatingState merging
Date Mon, 02 Oct 2017 08:19:41 GMT
[FLINK-7700] Fix RocksDB AggregatingState merging

Before, the merged state was not cleared.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0a7c091
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0a7c091
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0a7c091

Branch: refs/heads/release-1.3
Commit: b0a7c091856aa2a99715a9fddcc801f660414cfd
Parents: 0ffca86
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Sep 27 13:01:55 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Sep 28 17:26:29 2017 +0200

----------------------------------------------------------------------
 .../flink/contrib/streaming/state/RocksDBAggregatingState.java      | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7c091/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
index 1f306b4..c72b94e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java
@@ -157,6 +157,7 @@ public class RocksDBAggregatingState<K, N, T, ACC, R>
 					
 					final byte[] sourceKey = keySerializationStream.toByteArray();
 					final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
+					backend.db.remove(columnFamily, sourceKey);
 
 					if (valueBytes != null) {
 						ACC value = valueSerializer.deserialize(


Mime
View raw message