kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/5] kafka git commit: HOTFIX: Fix putAll and putIfAbsent logic for correct eviction behavior
Date Wed, 23 Nov 2016 15:28:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 3c0713923 -> f91d95ac9


HOTFIX: Fix putAll and putIfAbsent logic for correct eviction behavior

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy, Guozhang Wang

Closes #2038 from enothereska/hotfix-put-cache


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4fdbc72f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4fdbc72f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4fdbc72f

Branch: refs/heads/0.10.1
Commit: 4fdbc72fe099cbab200cc690f6075c69cbbefb4a
Parents: 3c07139
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Wed Oct 19 14:01:23 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Nov 23 07:15:11 2016 -0800

----------------------------------------------------------------------
 .../streams/state/internals/ThreadCache.java    | 14 ++++++--
 .../state/internals/ThreadCacheTest.java        | 37 ++++++++++++++++++++
 2 files changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4fdbc72f/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index d76e5c8..f7355d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -123,12 +123,20 @@ public class ThreadCache {
 
     public LRUCacheEntry putIfAbsent(final String namespace, byte[] key, LRUCacheEntry value)
{
         final NamedCache cache = getOrCreateCache(namespace);
-        return cache.putIfAbsent(Bytes.wrap(key), value);
+
+        final LRUCacheEntry result = cache.putIfAbsent(Bytes.wrap(key), value);
+        maybeEvict(namespace);
+
+        if (result == null) {
+            numPuts++;
+        }
+        return result;
     }
 
     public void putAll(final String namespace, final List<KeyValue<byte[], LRUCacheEntry>>
entries) {
-        final NamedCache cache = getOrCreateCache(namespace);
-        cache.putAll(entries);
+        for (KeyValue<byte[], LRUCacheEntry> entry : entries) {
+            put(namespace, entry.key, entry.value);
+        }
     }
 
     public LRUCacheEntry delete(final String namespace, final byte[] key) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4fdbc72f/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 2ff3b89..b07da6e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -389,6 +389,24 @@ public class ThreadCacheTest {
     }
 
     @Test
+    public void shouldEvictAfterPutAll() throws Exception {
+        final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
+        final String namespace = "namespace";
+        final ThreadCache cache = new ThreadCache(1);
+        cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener()
{
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+                received.addAll(dirty);
+            }
+        });
+
+        cache.putAll(namespace, Arrays.asList(KeyValue.pair(new byte[]{0}, dirtyEntry(new
byte[]{5})),
+            KeyValue.pair(new byte[]{1}, dirtyEntry(new byte[]{6}))));
+
+        assertEquals(cache.evicts(), 2);
+    }
+
+    @Test
     public void shouldPutAll() throws Exception {
         final ThreadCache cache = new ThreadCache(100000);
 
@@ -422,6 +440,25 @@ public class ThreadCacheTest {
         assertArrayEquals(value, cache.get("n", key).value);
     }
 
+    @Test
+    public void shouldEvictAfterPutIfAbsent() throws Exception {
+        final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
+        final String namespace = "namespace";
+        final ThreadCache cache = new ThreadCache(1);
+        cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener()
{
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+                received.addAll(dirty);
+            }
+        });
+
+        cache.putIfAbsent(namespace, new byte[]{0}, dirtyEntry(new byte[]{5}));
+        cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6}));
+        cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6}));
+
+        assertEquals(cache.evicts(), 3);
+    }
+
     private LRUCacheEntry dirtyEntry(final byte[] key) {
         return new LRUCacheEntry(key, true, -1, -1, -1, "");
     }


Mime
View raw message