kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5216: Fix peekNextKey in cached window/session store iterators
Date Fri, 12 May 2017 23:51:28 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 29214d336 -> 05c13552f


KAFKA-5216: Fix peekNextKey in cached window/session store iterators

guozhangwang mjsax dguy

Author: Xavier Léauté <xavier@confluent.io>

Reviewers: Damian Guy, Matthias J. Sax, Guozhang Wang

Closes #3016 from xvrl/kafka-5216


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05c13552
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05c13552
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05c13552

Branch: refs/heads/0.10.2
Commit: 05c13552f910b20aa45e221f92614cffe76c08fc
Parents: 29214d3
Author: Xavier Léauté <xavier@confluent.io>
Authored: Fri May 12 15:27:03 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri May 12 16:51:21 2017 -0700

----------------------------------------------------------------------
 .../AbstractMergedSortedCacheStoreIterator.java   |  2 +-
 .../MergedSortedCacheWindowStoreIteratorTest.java | 18 ++++++++++++++++--
 2 files changed, 17 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/05c13552/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
index 009dad0..344ca5a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
@@ -137,7 +137,7 @@ abstract class AbstractMergedSortedCacheStoreIterator<K, KS, V>
implements KeyVa
         }
 
         if (nextStoreKey == null) {
-            return serdes.keyFrom(nextCacheKey.get());
+            return deserializeCacheKey(nextCacheKey);
         }
 
         final int comparison = compare(nextCacheKey, nextStoreKey);

http://git-wip-us.apache.org/repos/asf/kafka/blob/05c13552/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
index 376fca8..f209632 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIteratorTest.java
@@ -72,7 +72,7 @@ public class MergedSortedCacheWindowStoreIteratorTest {
     }
 
     @Test
-    public void shouldPeekNextKey() throws Exception {
+    public void shouldPeekNextStoreKey() throws Exception {
         windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes()));
         cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes), new LRUCacheEntry("b".getBytes()));
         byte[] binaryFrom = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
@@ -85,4 +85,18 @@ public class MergedSortedCacheWindowStoreIteratorTest {
         assertThat(iterator.peekNextKey(), equalTo(10L));
     }
 
-}
\ No newline at end of file
+    @Test
+    public void shouldPeekNextCacheKey() throws Exception {
+        windowStoreKvPairs.add(KeyValue.pair(0L, "a".getBytes()));
+        cache.put(namespace, WindowStoreUtils.toBinaryKey("a", 10L, 0, stateSerdes), new
LRUCacheEntry("b".getBytes()));
+        Bytes fromBytes = WindowStoreUtils.toBinaryKey("a", 0, 0, stateSerdes);
+        Bytes toBytes = WindowStoreUtils.toBinaryKey("a", 100, 0, stateSerdes);
+        final KeyValueIterator<Long, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>("store",
new KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace,
fromBytes, toBytes);
+        final MergedSortedCacheWindowStoreIterator<byte[]> iterator = new MergedSortedCacheWindowStoreIterator<>(cacheIterator,
storeIterator, new StateSerdes<>("name", Serdes.Long(), Serdes.ByteArray()));
+        assertThat(iterator.peekNextKey(), equalTo(0L));
+        iterator.next();
+        assertThat(iterator.peekNextKey(), equalTo(10L));
+    }
+
+}


Mime
View raw message