kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5717; InMemoryKeyValueStore should delete keys with null values during restore
Date Wed, 09 Aug 2017 19:03:33 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a593db6a2 -> c35c47981


KAFKA-5717; InMemoryKeyValueStore should delete keys with null values during restore

Fixed a bug in the InMemoryKeyValueStore restoration where a key with a `null` value is written
in to the map rather than being deleted.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #3650 from dguy/kafka-5717


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c35c4798
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c35c4798
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c35c4798

Branch: refs/heads/trunk
Commit: c35c4798139bc30e3a380311e45a22ba56fcc918
Parents: a593db6
Author: Damian Guy <damian.guy@gmail.com>
Authored: Wed Aug 9 20:03:28 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Wed Aug 9 20:03:28 2017 +0100

----------------------------------------------------------------------
 .../state/internals/InMemoryKeyValueStore.java  |  4 ++--
 .../internals/InMemoryKeyValueStoreTest.java    | 24 ++++++++++++++++++++
 2 files changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c35c4798/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 41c6de3..7e24969 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -75,9 +75,9 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K,
V> {
             context.register(root, true, new StateRestoreCallback() {
                 @Override
                 public void restore(byte[] key, byte[] value) {
-                    // check value for null, to avoid  deserialization error.
+                    // this is a delete
                     if (value == null) {
-                        put(serdes.keyFrom(key), null);
+                        delete(serdes.keyFrom(key));
                     } else {
                         put(serdes.keyFrom(key), serdes.valueFrom(value));
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c35c4798/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 222ec71..541c003 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -20,6 +20,11 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
 
 public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
 
@@ -42,4 +47,23 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
{
         store.init(context, store);
         return store;
     }
+
+    @Test
+    public void shouldRemoveKeysWithNullValues() {
+        store.close();
+        // Add any entries that will be restored to any store
+        // that uses the driver's context ...
+        driver.addEntryToRestoreLog(0, "zero");
+        driver.addEntryToRestoreLog(1, "one");
+        driver.addEntryToRestoreLog(2, "two");
+        driver.addEntryToRestoreLog(3, "three");
+        driver.addEntryToRestoreLog(0, null);
+
+        store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        context.restore(store.name(), driver.restoredEntries());
+
+        assertEquals(3, driver.sizeOf(store));
+
+        assertThat(store.get(0), nullValue());
+    }
 }


Mime
View raw message