kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/3] kafka git commit: KAFKA-5192: add WindowStore range scan (KIP-155)
Date Fri, 19 May 2017 00:03:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 b661d3b8a -> 9170e87b2


http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 5e0e55c..040def6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
@@ -34,6 +35,7 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.MockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Test;
@@ -62,7 +64,7 @@ public class RocksDBWindowStoreTest {
     private static final long DEFAULT_CACHE_SIZE_BYTES = 1024 * 1024L;
 
     private final int numSegments = 3;
-    private final long windowSize = 3;
+    private final static long WINDOW_SIZE = 3;
     private final String windowName = "window";
     private final long segmentSize = Segments.MIN_SEGMENT_INTERVAL;
     private final long retentionPeriod = segmentSize * (numSegments - 1);
@@ -95,7 +97,8 @@ public class RocksDBWindowStoreTest {
 
     @SuppressWarnings("unchecked")
     private <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, final boolean enableCaching, final boolean retainDuplicates) {
-        final RocksDBWindowStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, retainDuplicates, Serdes.Integer(), Serdes.String(), windowSize, true, Collections.<String, String>emptyMap(), enableCaching);
+        final RocksDBWindowStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, retainDuplicates, Serdes.Integer(), Serdes.String(),
+                                                                                     WINDOW_SIZE, true, Collections.<String, String>emptyMap(), enableCaching);
         final WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
         store.init(context, store);
         return store;
@@ -148,30 +151,30 @@ public class RocksDBWindowStoreTest {
 
         putFirstBatch(windowStore, startTime, context);
 
-        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize)));
-        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize)));
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - WINDOW_SIZE, startTime + 1L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - WINDOW_SIZE, startTime + 3L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - WINDOW_SIZE, startTime + 4L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - WINDOW_SIZE, startTime + 5L + WINDOW_SIZE)));
 
         putSecondBatch(windowStore, startTime, context);
 
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize)));
-        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime - windowSize, startTime + windowSize)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize)));
-        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize)));
-        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize)));
-        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize)));
-        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize)));
-        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize)));
-        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L - WINDOW_SIZE, startTime - 2L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L - WINDOW_SIZE, startTime - 1L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L - WINDOW_SIZE, startTime + 1L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L - WINDOW_SIZE, startTime + 3L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L - WINDOW_SIZE, startTime + 4L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L - WINDOW_SIZE, startTime + 5L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L - WINDOW_SIZE, startTime + 6L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L - WINDOW_SIZE, startTime + 7L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - WINDOW_SIZE, startTime + 8L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - WINDOW_SIZE, startTime + 9L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - WINDOW_SIZE, startTime + 10L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - WINDOW_SIZE, startTime + 11L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - WINDOW_SIZE, startTime + 12L + WINDOW_SIZE)));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -189,36 +192,85 @@ public class RocksDBWindowStoreTest {
 
     @SuppressWarnings("unchecked")
     @Test
+    public void testFetchRange() throws IOException {
+        windowStore = createWindowStore(context, false, true);
+        long startTime = segmentSize - 4L;
+
+        putFirstBatch(windowStore, startTime, context);
+
+        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
+        final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
+        final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
+        final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
+        final KeyValue<Windowed<Integer>, String> five = windowedPair(5, "five", startTime + 5);
+
+        assertEquals(
+            Utils.mkList(zero, one),
+            StreamsTestUtils.toList(windowStore.fetch(0, 1, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE))
+        );
+        assertEquals(
+            Utils.mkList(one),
+            StreamsTestUtils.toList(windowStore.fetch(1, 1, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE))
+        );
+        assertEquals(
+            Utils.mkList(one, two),
+            StreamsTestUtils.toList(windowStore.fetch(1, 3, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE))
+        );
+        assertEquals(
+            Utils.mkList(zero, one, two),
+            StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE))
+        );
+        assertEquals(
+            Utils.mkList(zero, one, two,
+                         four, five),
+            StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 0L - WINDOW_SIZE, startTime + 0L + WINDOW_SIZE + 5L))
+        );
+        assertEquals(
+            Utils.mkList(two, four, five),
+            StreamsTestUtils.toList(windowStore.fetch(0, 5, startTime + 2L, startTime + 0L + WINDOW_SIZE + 5L))
+        );
+        assertEquals(
+            Utils.mkList(),
+            StreamsTestUtils.toList(windowStore.fetch(4, 5, startTime + 2L, startTime + WINDOW_SIZE))
+        );
+        assertEquals(
+            Utils.mkList(),
+            StreamsTestUtils.toList(windowStore.fetch(0, 3, startTime + 3L, startTime + WINDOW_SIZE + 5))
+        );
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
     public void testPutAndFetchBefore() throws IOException {
         windowStore = createWindowStore(context, false, true);
         long startTime = segmentSize - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
-        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - windowSize, startTime + 0L)));
-        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - windowSize, startTime + 1L)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - windowSize, startTime + 3L)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - windowSize, startTime + 4L)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - windowSize, startTime + 5L)));
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L - WINDOW_SIZE, startTime + 0L)));
+        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L - WINDOW_SIZE, startTime + 1L)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L - WINDOW_SIZE, startTime + 3L)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L - WINDOW_SIZE, startTime + 4L)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L - WINDOW_SIZE, startTime + 5L)));
 
         putSecondBatch(windowStore, startTime, context);
 
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 1L - windowSize, startTime - 1L)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 0L - windowSize, startTime + 0L)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 1L - windowSize, startTime + 1L)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
-        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime + 3L - windowSize, startTime + 3L)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 4L - windowSize, startTime + 4L)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 5L - windowSize, startTime + 5L)));
-        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 6L - windowSize, startTime + 6L)));
-        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 7L - windowSize, startTime + 7L)));
-        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - windowSize, startTime + 8L)));
-        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - windowSize, startTime + 9L)));
-        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - windowSize, startTime + 10L)));
-        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - windowSize, startTime + 11L)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - windowSize, startTime + 12L)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 13L - windowSize, startTime + 13L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 1L - WINDOW_SIZE, startTime - 1L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 0L - WINDOW_SIZE, startTime + 0L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 1L - WINDOW_SIZE, startTime + 1L)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L - WINDOW_SIZE, startTime + 2L)));
+        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime + 3L - WINDOW_SIZE, startTime + 3L)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 4L - WINDOW_SIZE, startTime + 4L)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 5L - WINDOW_SIZE, startTime + 5L)));
+        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 6L - WINDOW_SIZE, startTime + 6L)));
+        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 7L - WINDOW_SIZE, startTime + 7L)));
+        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 8L - WINDOW_SIZE, startTime + 8L)));
+        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 9L - WINDOW_SIZE, startTime + 9L)));
+        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 10L - WINDOW_SIZE, startTime + 10L)));
+        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 11L - WINDOW_SIZE, startTime + 11L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L - WINDOW_SIZE, startTime + 12L)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 13L - WINDOW_SIZE, startTime + 13L)));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -242,30 +294,30 @@ public class RocksDBWindowStoreTest {
 
         putFirstBatch(windowStore, startTime, context);
 
-        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L, startTime + 0L + windowSize)));
-        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L, startTime + 1L + windowSize)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L, startTime + 3L + windowSize)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L, startTime + 4L + windowSize)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L, startTime + 5L + windowSize)));
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime + 0L, startTime + 0L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + 1L, startTime + 1L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + 3L, startTime + 3L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + 4L, startTime + 4L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + 5L, startTime + 5L + WINDOW_SIZE)));
 
         putSecondBatch(windowStore, startTime, context);
 
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L, startTime - 2L + windowSize)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L, startTime - 1L + windowSize)));
-        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime, startTime + windowSize)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L, startTime + 1L + windowSize)));
-        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
-        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L, startTime + 3L + windowSize)));
-        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L, startTime + 4L + windowSize)));
-        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L, startTime + 5L + windowSize)));
-        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L, startTime + 6L + windowSize)));
-        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L, startTime + 7L + windowSize)));
-        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 8L, startTime + 8L + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 9L, startTime + 9L + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 10L, startTime + 10L + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 11L, startTime + 11L + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L, startTime + 12L + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime - 2L, startTime - 2L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime - 1L, startTime - 1L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two", "two+1"), toList(windowStore.fetch(2, startTime, startTime + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(windowStore.fetch(2, startTime + 1L, startTime + 1L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, startTime + 2L, startTime + 2L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, startTime + 3L, startTime + 3L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, startTime + 4L, startTime + 4L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 5L, startTime + 5L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, startTime + 6L, startTime + 6L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two+5", "two+6"), toList(windowStore.fetch(2, startTime + 7L, startTime + 7L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two+6"), toList(windowStore.fetch(2, startTime + 8L, startTime + 8L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 9L, startTime + 9L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 10L, startTime + 10L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 11L, startTime + 11L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + 12L, startTime + 12L + WINDOW_SIZE)));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -290,17 +342,17 @@ public class RocksDBWindowStoreTest {
         context.setRecordContext(createRecordContext(startTime));
         windowStore.put(0, "zero");
 
-        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
 
         windowStore.put(0, "zero");
         windowStore.put(0, "zero+");
         windowStore.put(0, "zero++");
 
-        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
-        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize)));
-        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize)));
-        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 1L - WINDOW_SIZE, startTime + 1L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 2L - WINDOW_SIZE, startTime + 2L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, startTime + 3L - WINDOW_SIZE, startTime + 3L + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime + 4L - WINDOW_SIZE, startTime + 4L + WINDOW_SIZE)));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -351,12 +403,12 @@ public class RocksDBWindowStoreTest {
                                  segments.segmentName(3),
                                  segments.segmentName(4)), segmentDirs(baseDir));
 
-        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
-        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+        assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("one"), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
 
         context.setRecordContext(createRecordContext(startTime + incr * 6));
         windowStore.put(6, "six");
@@ -365,13 +417,13 @@ public class RocksDBWindowStoreTest {
                                  segments.segmentName(5)), segmentDirs(baseDir));
 
 
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
 
 
         context.setRecordContext(createRecordContext(startTime + incr * 7));
@@ -380,14 +432,14 @@ public class RocksDBWindowStoreTest {
                                  segments.segmentName(4),
                                  segments.segmentName(5)), segmentDirs(baseDir));
 
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("two"), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE)));
 
         context.setRecordContext(createRecordContext(startTime + incr * 8));
         windowStore.put(8, "eight");
@@ -396,15 +448,15 @@ public class RocksDBWindowStoreTest {
                                  segments.segmentName(6)), segmentDirs(baseDir));
 
 
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
-        assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - WINDOW_SIZE, startTime + incr * 8 + WINDOW_SIZE)));
 
         // check segment directories
         windowStore.flush();
@@ -449,27 +501,27 @@ public class RocksDBWindowStoreTest {
         Utils.delete(baseDir);
 
         windowStore = createWindowStore(context, false, true);
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(8, startTime + incr * 8 - WINDOW_SIZE, startTime + incr * 8 + WINDOW_SIZE)));
 
         context.restore(windowName, changeLog);
 
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
-        assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - WINDOW_SIZE, startTime + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + incr - WINDOW_SIZE, startTime + incr + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + incr * 2 - WINDOW_SIZE, startTime + incr * 2 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList(), toList(windowStore.fetch(3, startTime + incr * 3 - WINDOW_SIZE, startTime + incr * 3 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + incr * 4 - WINDOW_SIZE, startTime + incr * 4 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + incr * 5 - WINDOW_SIZE, startTime + incr * 5 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + incr * 6 - WINDOW_SIZE, startTime + incr * 6 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + incr * 7 - WINDOW_SIZE, startTime + incr * 7 + WINDOW_SIZE)));
+        assertEquals(Utils.mkList("eight"), toList(windowStore.fetch(8, startTime + incr * 8 - WINDOW_SIZE, startTime + incr * 8 + WINDOW_SIZE)));
 
         // check segment directories
         windowStore.flush();
@@ -621,17 +673,19 @@ public class RocksDBWindowStoreTest {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldFetchAndIterateOverExactKeys() throws Exception {
+        final long windowSize = 0x7a00000000000000L;
+        final long retentionPeriod = 0x7a00000000000000L;
         final RocksDBWindowStoreSupplier<String, String> supplier =
                 new RocksDBWindowStoreSupplier<>(
-                        "window",
-                        0x7a00000000000000L, 2,
-                        true,
-                        Serdes.String(),
-                        Serdes.String(),
-                        0x7a00000000000000L,
-                        true,
-                        Collections.<String, String>emptyMap(),
-                        false);
+                    "window",
+                    retentionPeriod, 2,
+                    true,
+                    Serdes.String(),
+                    Serdes.String(),
+                    windowSize,
+                    true,
+                    Collections.<String, String>emptyMap(),
+                    false);
 
         windowStore = supplier.get();
         windowStore.init(context, windowStore);
@@ -645,6 +699,19 @@ public class RocksDBWindowStoreTest {
 
         final List expected = Utils.mkList("0001", "0003", "0005");
         assertThat(toList(windowStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected));
+
+        List<KeyValue<Windowed<String>, String>> list = StreamsTestUtils.toList(windowStore.fetch("a", "a", 0, Long.MAX_VALUE));
+        assertThat(list, equalTo(Utils.mkList(
+            windowedPair("a", "0001", 0, windowSize),
+            windowedPair("a", "0003", 1, windowSize),
+            windowedPair("a", "0005", 0x7a00000000000000L - 1, windowSize)
+        )));
+
+        list = StreamsTestUtils.toList(windowStore.fetch("aa", "aa", 0, Long.MAX_VALUE));
+        assertThat(list, equalTo(Utils.mkList(
+            windowedPair("aa", "0002", 0, windowSize),
+            windowedPair("aa", "0004", 1, windowSize)
+        )));
     }
 
     @SuppressWarnings("unchecked")
@@ -746,4 +813,12 @@ public class RocksDBWindowStoreTest {
 
         return entriesByKey;
     }
+
+    private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) {
+        return windowedPair(key, value, timestamp, WINDOW_SIZE);
+    }
+
+    private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp, long windowSize) {
+        return KeyValue.pair(new Windowed<>(key, WindowStoreUtils.timeWindowForSize(timestamp, windowSize)), value);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java
new file mode 100644
index 0000000..7a26660
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunctionTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+public class SegmentedCacheFunctionTest {
+
+    private static final int SEGMENT_INTERVAL = 17;
+    private static final int TIMESTAMP = 736213517;
+
+    private static final Bytes THE_KEY = WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, TIMESTAMP, 42);
+    private final static Bytes THE_CACHE_KEY = Bytes.wrap(
+        ByteBuffer.allocate(8 + THE_KEY.get().length)
+            .putLong(TIMESTAMP / SEGMENT_INTERVAL)
+            .put(THE_KEY.get()).array()
+    );
+
+    private final SegmentedCacheFunction cacheFunction = new SegmentedCacheFunction(new WindowKeySchema(), SEGMENT_INTERVAL);
+
+    @Test
+    public void key() throws Exception {
+        assertThat(
+            cacheFunction.key(THE_CACHE_KEY),
+            equalTo(THE_KEY)
+        );
+    }
+
+    @Test
+    public void cacheKey() throws Exception {
+        final long segmentId = TIMESTAMP / SEGMENT_INTERVAL;
+
+        final Bytes actualCacheKey = cacheFunction.cacheKey(THE_KEY);
+        final ByteBuffer buffer = ByteBuffer.wrap(actualCacheKey.get());
+
+        assertThat(buffer.getLong(), equalTo(segmentId));
+
+        byte[] actualKey = new byte[buffer.remaining()];
+        buffer.get(actualKey);
+        assertThat(Bytes.wrap(actualKey), equalTo(THE_KEY));
+    }
+
+    @Test
+    public void testRoundTripping() throws Exception {
+        assertThat(
+            cacheFunction.key(cacheFunction.cacheKey(THE_KEY)),
+            equalTo(THE_KEY)
+        );
+
+        assertThat(
+            cacheFunction.cacheKey(cacheFunction.key(THE_CACHE_KEY)),
+            equalTo(THE_CACHE_KEY)
+        );
+    }
+
+    @Test
+    public void compareSegmentedKeys() throws Exception {
+        assertThat(
+            "same key in same segment should be ranked the same",
+            cacheFunction.compareSegmentedKeys(
+                cacheFunction.cacheKey(THE_KEY),
+                THE_KEY
+            ) == 0
+        );
+
+        final Bytes sameKeyInPriorSegment = WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 1234, 42);
+
+        assertThat(
+            "same keys in different segments should be ordered according to segment",
+            cacheFunction.compareSegmentedKeys(
+                cacheFunction.cacheKey(sameKeyInPriorSegment),
+                THE_KEY
+            ) < 0
+        );
+
+        assertThat(
+            "same keys in different segments should be ordered according to segment",
+            cacheFunction.compareSegmentedKeys(
+                cacheFunction.cacheKey(THE_KEY),
+                sameKeyInPriorSegment
+            ) > 0
+        );
+
+        final Bytes lowerKeyInSameSegment = WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xB}, TIMESTAMP - 1, 0);
+
+        assertThat(
+            "different keys in same segments should be ordered according to key",
+            cacheFunction.compareSegmentedKeys(
+                cacheFunction.cacheKey(THE_KEY),
+                lowerKeyInSameSegment
+            ) > 0
+        );
+
+        assertThat(
+            "different keys in same segments should be ordered according to key",
+            cacheFunction.compareSegmentedKeys(
+                cacheFunction.cacheKey(lowerKeyInSameSegment),
+                THE_KEY
+            ) < 0
+        );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
index 354cf01..4682174 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionKeySchemaTest.java
@@ -52,17 +52,103 @@ public class SessionKeySchemaTest {
 
     @Test
     public void shouldFetchExactKeysSkippingLongerKeys() throws Exception {
-        final List<Integer> result = getValues(sessionKeySchema.hasNextCondition(Bytes.wrap(new byte[]{0}), 0, Long.MAX_VALUE));
+        final Bytes key = Bytes.wrap(new byte[]{0});
+        final List<Integer> result = getValues(sessionKeySchema.hasNextCondition(key, key, 0, Long.MAX_VALUE));
         assertThat(result, equalTo(Arrays.asList(2, 4)));
     }
 
     @Test
     public void shouldFetchExactKeySkippingShorterKeys() throws Exception {
-        final HasNextCondition hasNextCondition = sessionKeySchema.hasNextCondition(Bytes.wrap(new byte[]{0, 0}), 0, Long.MAX_VALUE);
+        final Bytes key = Bytes.wrap(new byte[]{0, 0});
+        final HasNextCondition hasNextCondition = sessionKeySchema.hasNextCondition(key, key, 0, Long.MAX_VALUE);
         final List<Integer> results = getValues(hasNextCondition);
         assertThat(results, equalTo(Arrays.asList(1, 5)));
     }
 
+    @Test
+    public void testUpperBoundWithLargeTimestamps() throws Exception {
+        Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), Long.MAX_VALUE);
+
+        assertThat(
+            "shorter key with max timestamp should be in range",
+            upper.compareTo(
+                SessionKeySerde.bytesToBinary(
+                    new Windowed<>(
+                        Bytes.wrap(new byte[]{0xA}),
+                        new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
+                )
+            ) >= 0
+        );
+
+        assertThat(
+            "shorter key with max timestamp should be in range",
+            upper.compareTo(
+                SessionKeySerde.bytesToBinary(
+                    new Windowed<>(
+                        Bytes.wrap(new byte[]{0xA, 0xB}),
+                        new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
+                )
+            ) >= 0
+        );
+
+        assertThat(upper, equalTo(SessionKeySerde.bytesToBinary(
+            new Windowed<>(Bytes.wrap(new byte[]{0xA}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))))
+        );
+    }
+
+    @Test
+    public void testUpperBoundWithKeyBytesLargerThanFirstTimestampByte() throws Exception {
+        Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}), Long.MAX_VALUE);
+
+        assertThat(
+            "shorter key with max timestamp should be in range",
+            upper.compareTo(
+                SessionKeySerde.bytesToBinary(
+                    new Windowed<>(
+                        Bytes.wrap(new byte[]{0xA, (byte) 0x8F}),
+                        new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
+                )
+            ) >= 0
+        );
+
+        assertThat(upper, equalTo(SessionKeySerde.bytesToBinary(
+            new Windowed<>(Bytes.wrap(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}), new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))))
+        );
+    }
+
+    @Test
+    public void testUpperBoundWithZeroTimestamp() throws Exception {
+        Bytes upper = sessionKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0);
+
+        assertThat(upper, equalTo(SessionKeySerde.bytesToBinary(
+            new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0))))
+        );
+    }
+
+    @Test
+    public void testLowerBoundWithZeroTimestamp() throws Exception {
+        Bytes lower = sessionKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0);
+        assertThat(lower, equalTo(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0)))));
+    }
+
+    @Test
+    public void testLowerBoundMatchesTrailingZeros() throws Exception {
+        Bytes lower = sessionKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), Long.MAX_VALUE);
+
+        assertThat(
+            "appending zeros to key should still be in range",
+            lower.compareTo(
+                SessionKeySerde.bytesToBinary(
+                    new Windowed<>(
+                        Bytes.wrap(new byte[]{0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}),
+                        new SessionWindow(Long.MAX_VALUE, Long.MAX_VALUE))
+                )
+            ) < 0
+        );
+
+        assertThat(lower, equalTo(SessionKeySerde.bytesToBinary(new Windowed<>(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), new SessionWindow(0, 0)))));
+    }
+
 
     private List<Integer> getValues(final HasNextCondition hasNextCondition) {
         final List<Integer> results = new ArrayList<>();
@@ -72,4 +158,4 @@ public class SessionKeySchemaTest {
         return results;
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
new file mode 100644
index 0000000..4a73e74
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsEqual.equalTo;
+
+public class WindowKeySchemaTest {
+
+    private final WindowKeySchema windowKeySchema = new WindowKeySchema();
+
+    @Test
+    public void testUpperBoundWithLargeTimestamps() throws Exception {
+        Bytes upper = windowKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), Long.MAX_VALUE);
+
+        assertThat(
+            "shorter key with max timestamp should be in range",
+            upper.compareTo(
+                WindowStoreUtils.toBinaryKey(
+                    new byte[]{0xA},
+                    Long.MAX_VALUE,
+                    Integer.MAX_VALUE
+                )
+            ) >= 0
+        );
+
+        assertThat(
+            "shorter key with max timestamp should be in range",
+            upper.compareTo(
+                WindowStoreUtils.toBinaryKey(
+                    new byte[]{0xA, 0xB},
+                    Long.MAX_VALUE,
+                    Integer.MAX_VALUE
+                )
+            ) >= 0
+        );
+
+        assertThat(upper, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA}, Long.MAX_VALUE, Integer.MAX_VALUE)));
+    }
+
+    @Test
+    public void testUpperBoundWithKeyBytesLargerThanFirstTimestampByte() throws Exception {
+        Bytes upper = windowKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}), Long.MAX_VALUE);
+
+        assertThat(
+            "shorter key with max timestamp should be in range",
+            upper.compareTo(
+                WindowStoreUtils.toBinaryKey(
+                    new byte[]{0xA, (byte) 0x8F},
+                    Long.MAX_VALUE,
+                    Integer.MAX_VALUE
+                )
+            ) >= 0
+        );
+
+        assertThat(upper, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, (byte) 0x8F, (byte) 0x9F}, Long.MAX_VALUE, Integer.MAX_VALUE)));
+    }
+
+
+    @Test
+    public void testUpperBoundWithKeyBytesLargerAndSmallerThanFirstTimestampByte() throws Exception {
+        Bytes upper = windowKeySchema.upperRange(Bytes.wrap(new byte[]{0xC, 0xC, 0x9}), 0x0AffffffffffffffL);
+
+        assertThat(
+            "shorter key with max timestamp should be in range",
+            upper.compareTo(
+                WindowStoreUtils.toBinaryKey(
+                    new byte[]{0xC, 0xC},
+                    0x0AffffffffffffffL,
+                    Integer.MAX_VALUE
+                )
+            ) >= 0
+        );
+
+        assertThat(upper, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xC, 0xC}, 0x0AffffffffffffffL, Integer.MAX_VALUE)));
+    }
+
+    @Test
+    public void testUpperBoundWithZeroTimestamp() throws Exception {
+        Bytes upper = windowKeySchema.upperRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0);
+        assertThat(upper, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 0, Integer.MAX_VALUE)));
+    }
+
+    @Test
+    public void testLowerBoundWithZeroTimestamp() throws Exception {
+        Bytes lower = windowKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 0);
+        assertThat(lower, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 0, 0)));
+    }
+
+    @Test
+    public void testLowerBoundWithMonZeroTimestamp() throws Exception {
+        Bytes lower = windowKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), 42);
+        assertThat(lower, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 0, 0)));
+    }
+
+    @Test
+    public void testLowerBoundMatchesTrailingZeros() throws Exception {
+        Bytes lower = windowKeySchema.lowerRange(Bytes.wrap(new byte[]{0xA, 0xB, 0xC}), Long.MAX_VALUE - 1);
+
+        assertThat(
+            "appending zeros to key should still be in range",
+            lower.compareTo(
+                WindowStoreUtils.toBinaryKey(
+                        new byte[]{0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
+                        Long.MAX_VALUE - 1,
+                        0
+                )
+            ) < 0
+        );
+
+        assertThat(lower, equalTo(WindowStoreUtils.toBinaryKey(new byte[]{0xA, 0xB, 0xC}, 0, 0)));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
index f7fd53b..ed408e6 100644
--- a/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/ReadOnlySessionStoreStub.java
@@ -26,12 +26,13 @@ import org.apache.kafka.streams.state.ReadOnlySessionStore;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 
 public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V>, StateStore {
-    private Map<K, List<KeyValue<Windowed<K>, V>>> sessions = new HashMap<>();
+    private NavigableMap<K, List<KeyValue<Windowed<K>, V>>> sessions = new TreeMap<>();
     private boolean open = true;
 
     public void put(final Windowed<K> sessionKey, final V value) {
@@ -53,6 +54,44 @@ public class ReadOnlySessionStoreStub<K, V> implements ReadOnlySessionStore<K, V
     }
 
     @Override
+    public KeyValueIterator<Windowed<K>, V> fetch(K from, K to) {
+        if (!open) {
+            throw new InvalidStateStoreException("not open");
+        }
+        if (!sessions.subMap(from, to).isEmpty()) {
+            return new KeyValueIteratorStub<>(Collections.<KeyValue<Windowed<K>, V>>emptyIterator());
+        }
+        final Iterator<List<KeyValue<Windowed<K>, V>>> keysIterator = sessions.subMap(from, to).values().iterator();
+        return new KeyValueIteratorStub<>(
+            new Iterator<KeyValue<Windowed<K>, V>>() {
+
+                Iterator<KeyValue<Windowed<K>, V>> it;
+
+                @Override
+                public boolean hasNext() {
+                    while (it == null || !it.hasNext()) {
+                        if (!keysIterator.hasNext()) {
+                            return false;
+                        }
+                        it = keysIterator.next().iterator();
+                    }
+                    return true;
+                }
+
+                @Override
+                public KeyValue<Windowed<K>, V> next() {
+                    return it.next();
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            }
+        );
+    }
+
+    @Override
     public String name() {
         return "";
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9170e87b/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java b/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
index 6ab70ae..7f8457c 100644
--- a/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/SegmentedBytesStoreStub.java
@@ -50,6 +50,11 @@ public class SegmentedBytesStoreStub implements SegmentedBytesStore {
 
     @Override
     public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long from, final long to) {
+        return fetch(key, key, from, to);
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> fetch(Bytes keyFrom, Bytes keyTo, long from, long to) {
         fetchCalled = true;
         return new KeyValueIteratorStub<>(Collections.<KeyValue<Bytes, byte[]>>emptyIterator());
     }


Mime
View raw message