kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-3941: Delay eviction listener in InMemoryKeyValueLoggedStore after restoration
Date Thu, 14 Jul 2016 01:11:30 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 3537063a5 -> f17790cd9


KAFKA-3941: Delay eviction listener in InMemoryKeyValueLoggedStore after restoration

Also move the initialization that restores from changelog to inner stores.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Eno Thereska, Dan Norwood

Closes #1610 from guozhangwang/K3941-avoid-eviction-listener


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f17790cd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f17790cd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f17790cd

Branch: refs/heads/trunk
Commit: f17790cd9952aecef67fbec58122c55c72e5c2b2
Parents: 3537063
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed Jul 13 18:11:25 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jul 13 18:11:25 2016 -0700

----------------------------------------------------------------------
 .../internals/InMemoryKeyValueLoggedStore.java  | 31 ++++++++---------
 .../InMemoryKeyValueStoreSupplier.java          | 26 +++++++++++++--
 .../InMemoryLRUCacheStoreSupplier.java          | 15 +++------
 .../streams/state/internals/MemoryLRUCache.java | 35 +++++++++++++++++---
 .../internals/MemoryNavigableLRUCache.java      | 10 +++---
 .../internals/RocksDBKeyValueStoreSupplier.java |  4 ++-
 .../streams/state/internals/RocksDBStore.java   |  2 +-
 .../internals/RocksDBWindowStoreSupplier.java   |  4 ++-
 .../processor/internals/StandbyTaskTest.java    |  4 ---
 9 files changed, 84 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f17790cd/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index 3c2e659..c00bf76 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -20,7 +20,6 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -35,7 +34,6 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K,
V> {
     private final Serde<V> valueSerde;
     private final String storeName;
 
-    private StateSerdes<K, V> serdes;
     private StoreChangeLogger<K, V> changeLogger;
     private StoreChangeLogger.ValueGetter<K, V> getter;
 
@@ -54,34 +52,31 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K,
V> {
     @Override
     @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
+        inner.init(context, root);
+
         // construct the serde
-        this.serdes = new StateSerdes<>(storeName,
+        StateSerdes<K, V>  serdes = new StateSerdes<>(storeName,
                 keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
                 valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
 
         this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes);
 
-        context.register(root, true, new StateRestoreCallback() {
-            @Override
-            public void restore(byte[] key, byte[] value) {
-
-                // directly call inner functions so that the operation is not logged. Check
value for null, to avoid  deserialization error.
-                if (value == null) {
-                    inner.put(serdes.keyFrom(key), null);
-                } else {
-                    inner.put(serdes.keyFrom(key), serdes.valueFrom(value));
-                }
-            }
-        });
-
-        inner.init(context, root);
-
         this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
             @Override
             public V get(K key) {
                 return inner.get(key);
             }
         };
+
+        // if the inner store is an LRU cache, add the eviction listener to log removed record
+        if (inner instanceof MemoryLRUCache) {
+            ((MemoryLRUCache<K, V>) inner).whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K,
V>() {
+                @Override
+                public void apply(K key, V value) {
+                    removed(key);
+                }
+            });
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f17790cd/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 3b632cc..0cc4586 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -21,10 +21,12 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.Iterator;
 import java.util.List;
@@ -67,7 +69,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier
{
     }
 
     public StateStore get() {
-        return new MeteredKeyValueStore<>(new MemoryStore<>(name, keySerde, valueSerde).enableLogging(),
"in-memory-state", time);
+        MemoryStore<K, V> store = new MemoryStore<>(name, keySerde, valueSerde);
+
+        return new MeteredKeyValueStore<>(store.enableLogging(), "in-memory-state",
time);
     }
 
     private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
@@ -76,6 +80,8 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier
{
         private final Serde<V> valueSerde;
         private final NavigableMap<K, V> map;
 
+        private StateSerdes<K, V> serdes;
+
         public MemoryStore(String name, Serde<K> keySerde, Serde<V> valueSerde)
{
             this.name = name;
             this.keySerde = keySerde;
@@ -98,7 +104,23 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier
{
         @Override
         @SuppressWarnings("unchecked")
         public void init(ProcessorContext context, StateStore root) {
-            // do nothing
+            // construct the serde
+            this.serdes = new StateSerdes<>(name,
+                    keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                    valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+
+            // register the store
+            context.register(root, true, new StateRestoreCallback() {
+                @Override
+                public void restore(byte[] key, byte[] value) {
+                    // check value for null, to avoid  deserialization error.
+                    if (value == null) {
+                        put(serdes.keyFrom(key), null);
+                    } else {
+                        put(serdes.keyFrom(key), serdes.valueFrom(value));
+                    }
+                }
+            });
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f17790cd/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index 4a4fa5f..20a7333 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -52,17 +52,10 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier
{
         return name;
     }
 
-    @SuppressWarnings("unchecked")
     public StateStore get() {
-        final MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<K,
V>(name, capacity);
-        final InMemoryKeyValueLoggedStore<K, V> loggedCache = (InMemoryKeyValueLoggedStore)
cache.enableLogging(keySerde, valueSerde);
-        final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(loggedCache,
"in-memory-lru-state", time);
-        cache.whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K,
V>() {
-            @Override
-            public void apply(K key, V value) {
-                loggedCache.removed(key);
-            }
-        });
-        return store;
+        MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<>(name,
capacity, keySerde, valueSerde);
+        InMemoryKeyValueLoggedStore<K, V> loggedCache = (InMemoryKeyValueLoggedStore<K,
V>) cache.enableLogging();
+
+        return new MeteredKeyValueStore<>(loggedCache, "in-memory-lru-state", time);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f17790cd/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 0697eda..083f811 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -19,9 +19,11 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -48,16 +50,25 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K,
V> {
         void apply(K key, V value);
     }
 
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+
     protected String name;
     protected Map<K, V> map;
     protected Set<K> keys;
+    private StateSerdes<K, V> serdes;
 
     protected EldestEntryRemovalListener<K, V> listener;
 
     // this is used for extended MemoryNavigableLRUCache only
-    public MemoryLRUCache() {}
+    public MemoryLRUCache(Serde<K> keySerde, Serde<V> valueSerde) {
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+    }
+
+    public MemoryLRUCache(String name, final int maxCacheSize, Serde<K> keySerde, Serde<V>
valueSerde) {
+        this(keySerde, valueSerde);
 
-    public MemoryLRUCache(String name, final int maxCacheSize) {
         this.name = name;
         this.keys = new HashSet<>();
 
@@ -78,7 +89,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V>
{
         };
     }
 
-    public KeyValueStore<K, V> enableLogging(Serde<K> keySerde, Serde<V>
valueSerde) {
+    public KeyValueStore<K, V> enableLogging() {
         return new InMemoryKeyValueLoggedStore<>(this.name, this, keySerde, valueSerde);
     }
 
@@ -96,7 +107,23 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K,
V> {
     @Override
     @SuppressWarnings("unchecked")
     public void init(ProcessorContext context, StateStore root) {
-        // do nothing
+        // construct the serde
+        this.serdes = new StateSerdes<>(name,
+                keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+                valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+
+        // register the store
+        context.register(root, true, new StateRestoreCallback() {
+            @Override
+            public void restore(byte[] key, byte[] value) {
+                // check value for null, to avoid  deserialization error.
+                if (value == null) {
+                    put(serdes.keyFrom(key), null);
+                } else {
+                    put(serdes.keyFrom(key), serdes.valueFrom(value));
+                }
+            }
+        });
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f17790cd/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index 99bac93..5eb4f49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.KeyValueIterator;
 
@@ -27,8 +28,8 @@ import java.util.TreeSet;
 
 public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
 
-    public MemoryNavigableLRUCache(String name, final int maxCacheSize) {
-        super();
+    public MemoryNavigableLRUCache(String name, final int maxCacheSize, Serde<K> keySerde,
Serde<V> valueSerde) {
+        super(keySerde, valueSerde);
 
         this.name = name;
         this.keys = new TreeSet<>();
@@ -57,15 +58,14 @@ public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K,
V> {
         return this;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public KeyValueIterator<K, V> range(K from, K to) {
-        return new MemoryNavigableLRUCache.CacheIterator<K, V>(((NavigableSet) this.keys).subSet(from,
true, to, false).iterator(), this.map);
+        return new MemoryNavigableLRUCache.CacheIterator<>(((NavigableSet<K>)
this.keys).subSet(from, true, to, false).iterator(), this.map);
     }
 
     @Override
     public KeyValueIterator<K, V> all() {
-        return new MemoryNavigableLRUCache.CacheIterator<K, V>(this.keys.iterator(),
this.map);
+        return new MemoryNavigableLRUCache.CacheIterator<>(this.keys.iterator(), this.map);
     }
 
     private static class CacheIterator<K, V> implements KeyValueIterator<K, V>
{

http://git-wip-us.apache.org/repos/asf/kafka/blob/f17790cd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index af98733..16111ad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -53,6 +53,8 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier
{
     }
 
     public StateStore get() {
-        return new MeteredKeyValueStore<>(new RocksDBStore<>(name, keySerde,
valueSerde).enableLogging(), "rocksdb-state", time);
+        RocksDBStore<K, V> store = new RocksDBStore<>(name, keySerde, valueSerde);
+
+        return new MeteredKeyValueStore<>(store.enableLogging(), "rocksdb-state", time);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f17790cd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 394406f..baf5b9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -173,7 +173,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V>
{
         this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context,
WindowStoreUtils.INNER_SERDES) : null;
 
         if (this.cacheSize > 0) {
-            this.cache = new MemoryLRUCache<K, RocksDBCacheEntry>(name, cacheSize)
+            this.cache = new MemoryLRUCache<K, RocksDBCacheEntry>(name, cacheSize,
null, null)
                     .whenEldestRemoved(new MemoryLRUCache.EldestEntryRemovalListener<K,
RocksDBCacheEntry>() {
                         @Override
                         public void apply(K key, RocksDBCacheEntry entry) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f17790cd/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index 0407299..3a1bd59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -59,7 +59,9 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier
{
     }
 
     public StateStore get() {
-        return new MeteredWindowStore<>(new RocksDBWindowStore<>(name, retentionPeriod,
numSegments, retainDuplicates, keySerde, valueSerde).enableLogging(), "rocksdb-window", time);
+        RocksDBWindowStore<K, V> store = new RocksDBWindowStore<>(name, retentionPeriod,
numSegments, retainDuplicates, keySerde, valueSerde);
+
+        return new MeteredWindowStore<>(store.enableLogging(), "rocksdb-window", time);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f17790cd/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index e7fb9a4..9e15e1c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -115,15 +115,11 @@ public class StandbyTaskTest {
                 new PartitionInfo(storeChangelogTopicName1, 2, Node.noNode(), new Node[0],
new Node[0])
         ));
 
-        System.out.println("added " + storeChangelogTopicName1);
-
         restoreStateConsumer.updatePartitions(storeChangelogTopicName2, Utils.mkList(
                 new PartitionInfo(storeChangelogTopicName2, 0, Node.noNode(), new Node[0],
new Node[0]),
                 new PartitionInfo(storeChangelogTopicName2, 1, Node.noNode(), new Node[0],
new Node[0]),
                 new PartitionInfo(storeChangelogTopicName2, 2, Node.noNode(), new Node[0],
new Node[0])
         ));
-
-        System.out.println("added " + storeChangelogTopicName2);
     }
 
     @Test


Mime
View raw message