kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-3104: add windowed aggregation to KStream
Date Mon, 18 Jan 2016 20:14:55 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk cc3570d1a -> a62eb5993


http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
index 5189318..2f30712 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
@@ -20,6 +20,7 @@
 package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
 import java.text.SimpleDateFormat;
@@ -34,6 +35,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     public static final long MIN_SEGMENT_INTERVAL = 60 * 1000; // one minute
 
+    private static final long USE_CURRENT_TIMESTAMP = -1L;
+
     private static class Segment extends RocksDBStore<byte[], byte[]> {
         public final long id;
 
@@ -73,11 +76,14 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         }
 
         @Override
-        public V next() {
+        public KeyValue<Long, V> next() {
             if (index >= iterators.length)
                 throw new NoSuchElementException();
 
-            return serdes.valueFrom(iterators[index].next().value());
+            Entry<byte[], byte[]> entry = iterators[index].next();
+
+            return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(entry.key()),
+                                  serdes.valueFrom(entry.value()));
         }
 
         @Override
@@ -86,6 +92,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
                 iterators[index].remove();
         }
 
+        @Override
         public void close() {
             for (KeyValueIterator<byte[], byte[]> iterator : iterators) {
                 iterator.close();
@@ -94,9 +101,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     private final String name;
-    private final long windowBefore;
-    private final long windowAfter;
     private final long segmentInterval;
+    private final boolean retainDuplicates;
     private final Segment[] segments;
     private final Serdes<K, V> serdes;
     private final SimpleDateFormat formatter;
@@ -105,14 +111,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     private long currentSegmentId = -1L;
     private int seqnum = 0;
 
-    public RocksDBWindowStore(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes) {
+    public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes) {
         this.name = name;
-        this.windowBefore = windowBefore;
-        this.windowAfter = windowAfter;
-
-        // The retention period must be at least two times as long as the total window size
-        if ((this.windowBefore + this.windowAfter + 1) * 2 > retentionPeriod)
-            retentionPeriod = (this.windowBefore + this.windowAfter + 1) * 2;
 
         // The segment interval must be greater than MIN_SEGMENT_INTERVAL
         this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
@@ -120,6 +120,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         this.segments = new Segment[numSegments];
         this.serdes = serdes;
 
+        this.retainDuplicates = retainDuplicates;
+
         // Create a date formatter. Formatted timestamps are used as segment name suffixes
         this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
         this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT"));
@@ -158,12 +160,18 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     @Override
     public void put(K key, V value) {
-        putAndReturnInternalKey(key, value);
+        putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP);
     }
 
     @Override
-    public byte[] putAndReturnInternalKey(K key, V value) {
-        long timestamp = context.timestamp();
+    public void put(K key, V value, long timestamp) {
+        putAndReturnInternalKey(key, value, timestamp);
+    }
+
+    @Override
+    public byte[] putAndReturnInternalKey(K key, V value, long t) {
+        long timestamp = t == USE_CURRENT_TIMESTAMP ? context.timestamp() : t;
+
         long segmentId = segmentId(timestamp);
 
         if (segmentId > currentSegmentId) {
@@ -174,7 +182,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
         // If the record is within the retention period, put it in the store.
         if (segmentId > currentSegmentId - segments.length) {
-            seqnum = (seqnum + 1) & 0x7FFFFFFF;
+            if (retainDuplicates)
+                seqnum = (seqnum + 1) & 0x7FFFFFFF;
             byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes);
             getSegment(segmentId).put(binaryKey, serdes.rawValue(value));
             return binaryKey;
@@ -213,10 +222,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     @SuppressWarnings("unchecked")
     @Override
-    public WindowStoreIterator<V> fetch(K key, long timestamp) {
-        long timeFrom = Math.max(0L, timestamp - windowBefore);
-        long timeTo = Math.max(0L, timestamp + windowAfter);
-
+    public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
         long segFrom = segmentId(timeFrom);
         long segTo = segmentId(Math.max(0L, timeTo));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
index 73814ef..fcdcb9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
@@ -32,18 +32,16 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
 public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
 
     private final String name;
-    private final long windowBefore;
-    private final long windowAfter;
     private final long retentionPeriod;
+    private final boolean retainDuplicates;
     private final int numSegments;
     private final Serdes serdes;
     private final Time time;
 
-    public RocksDBWindowStoreSupplier(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes, Time time) {
+    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) {
         this.name = name;
-        this.windowBefore = windowBefore;
-        this.windowAfter = windowAfter;
         this.retentionPeriod = retentionPeriod;
+        this.retainDuplicates = retainDuplicates;
         this.numSegments = numSegments;
         this.serdes = serdes;
         this.time = time;
@@ -54,7 +52,7 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, windowBefore, windowAfter, retentionPeriod, numSegments, serdes), "rocksdb-window", time);
+        return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, retentionPeriod, numSegments, retainDuplicates, serdes), "rocksdb-window", time);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index 344aa91..b17d889 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -25,9 +25,11 @@ public interface WindowStore<K, V> extends StateStore {
 
     void put(K key, V value);
 
-    byte[] putAndReturnInternalKey(K key, V value);
+    void put(K key, V value, long timestamp);
 
-    WindowStoreIterator<V> fetch(K key, long timestamp);
+    byte[] putAndReturnInternalKey(K key, V value, long timestamp);
+
+    WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
 
     void putInternal(byte[] binaryKey, byte[] binaryValue);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
index e57a00f..55d1ac3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
@@ -19,8 +19,10 @@
 
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.streams.kstream.KeyValue;
+
 import java.util.Iterator;
 
-public interface WindowStoreIterator<E> extends Iterator<E> {
+public interface WindowStoreIterator<E> extends Iterator<KeyValue<Long, E>> {
     void close();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
new file mode 100644
index 0000000..ba596a9
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.AggregatorSupplier;
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamAggregateTest {
+
+    private final Serializer<String> strSerializer = new StringSerializer();
+    private final Deserializer<String> strDeserializer = new StringDeserializer();
+
+    private class StringCanonizeSupplier implements AggregatorSupplier<String, String, String> {
+
+        private class StringCanonizer implements Aggregator<String, String, String> {
+
+            @Override
+            public String initialValue() {
+                return "0";
+            }
+
+            @Override
+            public String add(String aggKey, String value, String aggregate) {
+                return aggregate + "+" + value;
+            }
+
+            @Override
+            public String remove(String aggKey, String value, String aggregate) {
+                return aggregate + "-" + value;
+            }
+
+            @Override
+            public String merge(String aggr1, String aggr2) {
+                return "(" + aggr1 + ") + (" + aggr2 + ")";
+            }
+        }
+
+        @Override
+        public Aggregator<String, String, String> get() {
+            return new StringCanonizer();
+        }
+    }
+
+    @Test
+    public void testAggBasic() throws Exception {
+        final File baseDir = Files.createTempDirectory("test").toFile();
+
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+            String topic1 = "topic1";
+
+            KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1);
+            KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringCanonizeSupplier(),
+                    HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
+                    strSerializer,
+                    strSerializer,
+                    strDeserializer,
+                    strDeserializer);
+
+            MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+            table2.toStream().process(proc2);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+
+            driver.setTime(0L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(1L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(2L);
+            driver.process(topic1, "C", "3");
+            driver.setTime(3L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(4L);
+            driver.process(topic1, "A", "1");
+
+            driver.setTime(5L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(6L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(7L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(8L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(9L);
+            driver.process(topic1, "C", "3");
+
+            driver.setTime(10L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(11L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(12L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(13L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(14L);
+            driver.process(topic1, "C", "3");
+
+            assertEquals(Utils.mkList(
+                    "[A@0]:0+1",
+                    "[B@0]:0+2",
+                    "[C@0]:0+3",
+                    "[D@0]:0+4",
+                    "[A@0]:0+1+1",
+
+                    "[A@0]:0+1+1+1", "[A@5]:0+1",
+                    "[B@0]:0+2+2", "[B@5]:0+2",
+                    "[D@0]:0+4+4", "[D@5]:0+4",
+                    "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+                    "[C@0]:0+3+3", "[C@5]:0+3",
+
+                    "[A@5]:0+1+1", "[A@10]:0+1",
+                    "[B@5]:0+2+2+2", "[B@10]:0+2",
+                    "[D@5]:0+4+4", "[D@10]:0+4",
+                    "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
+                    "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 90341a8..e763fd2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -125,7 +125,7 @@ public class KStreamKStreamJoinTest {
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
             // w2 = { 0:Y0, 1:Y1 }
             // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
@@ -137,7 +137,7 @@ public class KStreamKStreamJoinTest {
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
             // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
             // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
 
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
@@ -149,7 +149,7 @@ public class KStreamKStreamJoinTest {
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
             // w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
             // --> w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
-            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
+            //     w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3, 0:YYY0, 1:YYY1 }
 
             for (int i = 0; i < 2; i++) {
                 driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 189cf9d..b5037ee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -47,7 +47,7 @@ public class KTableAggregateTest {
 
             @Override
             public String initialValue() {
-                return "";
+                return "0";
             }
 
             @Override
@@ -106,14 +106,14 @@ public class KTableAggregateTest {
             driver.process(topic1, "C", "8");
 
             assertEquals(Utils.mkList(
-                    "A:+1",
-                    "B:+2",
-                    "A:+1+3", "A:+1+3-1",
-                    "B:+2+4", "B:+2+4-2",
-                    "C:+5",
-                    "D:+6",
-                    "B:+2+4-2+7", "B:+2+4-2+7-4",
-                    "C:+5+8", "C:+5+8-5"), proc2.processed);
+                    "A:0+1",
+                    "B:0+2",
+                    "A:0+1+3", "A:0+1+3-1",
+                    "B:0+2+4", "B:0+2+4-2",
+                    "C:0+5",
+                    "D:0+6",
+                    "B:0+2+4-2+7", "B:0+2+4-2+7-4",
+                    "C:0+5+8", "C:0+5+8-5"), proc2.processed);
 
         } finally {
             Utils.delete(baseDir);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java
new file mode 100644
index 0000000..f9b6ba5
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.HoppingWindows;
+import org.apache.kafka.streams.kstream.TumblingWindows;
+import org.apache.kafka.streams.kstream.UnlimitedWindows;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class WindowsTest {
+
+    @Test
+    public void hoppingWindows() {
+
+        HoppingWindows windows = HoppingWindows.of("test").with(12L).every(5L);
+
+        Map<Long, HoppingWindow> matched = windows.windowsFor(21L);
+
+        assertEquals(3, matched.size());
+
+        assertEquals(new HoppingWindow(10L, 22L), matched.get(10L));
+        assertEquals(new HoppingWindow(15L, 27L), matched.get(15L));
+        assertEquals(new HoppingWindow(20L, 32L), matched.get(20L));
+    }
+
+    @Test
+    public void tumblineWindows() {
+
+        TumblingWindows windows = TumblingWindows.of("test").with(12L);
+
+        Map<Long, TumblingWindow> matched = windows.windowsFor(21L);
+
+        assertEquals(1, matched.size());
+
+        assertEquals(new TumblingWindow(12L, 24L), matched.get(12L));
+    }
+
+    @Test
+    public void unlimitedWindows() {
+
+        UnlimitedWindows windows = UnlimitedWindows.of("test").startOn(10L);
+
+        Map<Long, UnlimitedWindow> matched = windows.windowsFor(21L);
+
+        assertEquals(1, matched.size());
+
+        assertEquals(new UnlimitedWindow(10L), matched.get(10L));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a62eb599/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
index 6bfddfe..fc7a4e9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
@@ -53,13 +53,11 @@ public class RocksDBWindowStoreTest {
     private final long windowSize = 3;
     private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", Integer.class, String.class);
 
-
-    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, long windowBefore, long windowAfter, Serdes<K, V> serdes) {
-        StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", windowBefore, windowAfter, retentionPeriod, numSegments, serdes, null);
+    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, Serdes<K, V> serdes) {
+        StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, serdes, null);
         WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
         store.init(context);
         return store;
-
     }
 
     @Test
@@ -83,7 +81,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
             try {
                 long startTime = segmentSize - 4L;
 
@@ -100,12 +98,12 @@ public class RocksDBWindowStoreTest {
                 context.setTime(startTime + 5L);
                 store.put(5, "five");
 
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L)));
-                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L)));
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize)));
 
                 context.setTime(startTime + 3L);
                 store.put(2, "two+1");
@@ -120,21 +118,21 @@ public class RocksDBWindowStoreTest {
                 context.setTime(startTime + 8L);
                 store.put(2, "two+6");
 
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L)));
-                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L)));
-                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L)));
-                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L)));
-                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L)));
-                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L)));
-                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L)));
-                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize)));
+                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize)));
 
                 // Flush the store and verify all current entries were properly flushed ...
                 store.flush();
@@ -179,7 +177,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, windowSize, 0, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
             try {
                 long startTime = segmentSize - 4L;
 
@@ -196,12 +194,12 @@ public class RocksDBWindowStoreTest {
                 context.setTime(startTime + 5L);
                 store.put(5, "five");
 
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L)));
-                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L)));
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L)));
 
                 context.setTime(startTime + 3L);
                 store.put(2, "two+1");
@@ -216,21 +214,21 @@ public class RocksDBWindowStoreTest {
                 context.setTime(startTime + 8L);
                 store.put(2, "two+6");
 
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
-                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L)));
-                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L)));
-                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L)));
-                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L)));
-                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L)));
-                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L)));
-                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L - windowSize, startTime + 0L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
+                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L)));
+                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L - windowSize, startTime + 13L)));
 
                 // Flush the store and verify all current entries were properly flushed ...
                 store.flush();
@@ -275,7 +273,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, 0, windowSize, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
             try {
                 long startTime = segmentSize - 4L;
 
@@ -292,12 +290,12 @@ public class RocksDBWindowStoreTest {
                 context.setTime(startTime + 5L);
                 store.put(5, "five");
 
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L)));
-                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L)));
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L, startTime + 0L + windowSize)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L, startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L, startTime + 5L + windowSize)));
 
                 context.setTime(startTime + 3L);
                 store.put(2, "two+1");
@@ -312,21 +310,21 @@ public class RocksDBWindowStoreTest {
                 context.setTime(startTime + 8L);
                 store.put(2, "two+6");
 
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L)));
-                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L)));
-                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L)));
-                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L)));
-                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L)));
-                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L)));
-                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L)));
-                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L, startTime - 2L + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L, startTime - 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime, startTime + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L, startTime + 4L + windowSize)));
+                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L, startTime + 5L + windowSize)));
+                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L, startTime + 6L + windowSize)));
+                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L, startTime + 7L + windowSize)));
+                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L, startTime + 8L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L, startTime + 9L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L, startTime + 10L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L, startTime + 11L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L, startTime + 12L + windowSize)));
 
                 // Flush the store and verify all current entries were properly flushed ...
                 store.flush();
@@ -371,14 +369,14 @@ public class RocksDBWindowStoreTest {
                     byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
             try {
                 long startTime = segmentSize - 4L;
 
                 context.setTime(startTime);
                 store.put(0, "zero");
 
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime)));
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
 
                 context.setTime(startTime);
                 store.put(0, "zero");
@@ -387,11 +385,11 @@ public class RocksDBWindowStoreTest {
                 context.setTime(startTime);
                 store.put(0, "zero++");
 
-                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime)));
-                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L)));
-                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L)));
-                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize)));
 
                 // Flush the store and verify all current entries were properly flushed ...
                 store.flush();
@@ -430,7 +428,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
             RocksDBWindowStore<Integer, String> inner =
                     (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
             try {
@@ -461,51 +459,52 @@ public class RocksDBWindowStoreTest {
                 store.put(5, "five");
                 assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
 
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime)));
-                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
+                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
 
                 context.setTime(startTime + incr * 6);
                 store.put(6, "six");
                 assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
 
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
-                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
-                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+
 
                 context.setTime(startTime + incr * 7);
                 store.put(7, "seven");
                 assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
 
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
-                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
-                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
-                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7)));
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
 
                 context.setTime(startTime + incr * 8);
                 store.put(8, "eight");
                 assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
 
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
-                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
-                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
-                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7)));
-                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8)));
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
 
                 // check segment directories
                 store.flush();
@@ -546,7 +545,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
             try {
                 context.setTime(startTime);
                 store.put(0, "zero");
@@ -595,7 +594,7 @@ public class RocksDBWindowStoreTest {
                     byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
                     recordCollector);
 
-            WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+            WindowStore<Integer, String> store = createWindowStore(context, serdes);
             RocksDBWindowStore<Integer, String> inner =
                     (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
 
@@ -604,15 +603,15 @@ public class RocksDBWindowStoreTest {
 
                 assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
 
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
-                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
-                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
-                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7)));
-                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8)));
+                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
 
                 // check segment directories
                 store.flush();
@@ -633,7 +632,7 @@ public class RocksDBWindowStoreTest {
     private <E> List<E> toList(WindowStoreIterator<E> iterator) {
         ArrayList<E> list = new ArrayList<>();
         while (iterator.hasNext()) {
-            list.add(iterator.next());
+            list.add(iterator.next().value);
         }
         return list;
     }


Mime
View raw message