kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/5] kafka git commit: KAFKA-3121: Remove aggregatorSupplier and add Reduce functions
Date Thu, 21 Jan 2016 00:10:48 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
deleted file mode 100644
index fc7a4e9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
+++ /dev/null
@@ -1,671 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.test.MockProcessorContext;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-public class RocksDBWindowStoreTest {
-
-    private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
-    private final ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer();
-    private final int numSegments = 3;
-    private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL;
-    private final long retentionPeriod = segmentSize * (numSegments - 1);
-    private final long windowSize = 3;
-    private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", Integer.class, String.class);
-
-    protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, Serdes<K, V> serdes) {
-        StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, serdes, null);
-        WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
-        store.init(context);
-        return store;
-    }
-
-    @Test
-    public void testPutAndFetch() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
-            RecordCollector recordCollector = new RecordCollector(producer) {
-                @Override
-                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
-                                    keySerializer.serialize(record.topic(), record.key()),
-                                    valueSerializer.serialize(record.topic(), record.value()))
-                    );
-                }
-            };
-
-            MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
-                    recordCollector);
-
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
-            try {
-                long startTime = segmentSize - 4L;
-
-                context.setTime(startTime + 0L);
-                store.put(0, "zero");
-                context.setTime(startTime + 1L);
-                store.put(1, "one");
-                context.setTime(startTime + 2L);
-                store.put(2, "two");
-                context.setTime(startTime + 3L);
-                // (3, "three") is not put
-                context.setTime(startTime + 4L);
-                store.put(4, "four");
-                context.setTime(startTime + 5L);
-                store.put(5, "five");
-
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize)));
-                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize)));
-
-                context.setTime(startTime + 3L);
-                store.put(2, "two+1");
-                context.setTime(startTime + 4L);
-                store.put(2, "two+2");
-                context.setTime(startTime + 5L);
-                store.put(2, "two+3");
-                context.setTime(startTime + 6L);
-                store.put(2, "two+4");
-                context.setTime(startTime + 7L);
-                store.put(2, "two+5");
-                context.setTime(startTime + 8L);
-                store.put(2, "two+6");
-
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize)));
-                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime - windowSize, startTime + windowSize)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize)));
-                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize)));
-                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize)));
-                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize)));
-                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize)));
-                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize)));
-                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize)));
-
-                // Flush the store and verify all current entries were properly flushed ...
-                store.flush();
-
-                Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
-                assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
-                assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
-                assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
-                assertNull(entriesByKey.get(3));
-                assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
-                assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
-                assertNull(entriesByKey.get(6));
-
-            } finally {
-                store.close();
-            }
-
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    @Test
-    public void testPutAndFetchBefore() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
-            RecordCollector recordCollector = new RecordCollector(producer) {
-                @Override
-                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
-                                    keySerializer.serialize(record.topic(), record.key()),
-                                    valueSerializer.serialize(record.topic(), record.value()))
-                    );
-                }
-            };
-
-            MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
-                    recordCollector);
-
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
-            try {
-                long startTime = segmentSize - 4L;
-
-                context.setTime(startTime + 0L);
-                store.put(0, "zero");
-                context.setTime(startTime + 1L);
-                store.put(1, "one");
-                context.setTime(startTime + 2L);
-                store.put(2, "two");
-                context.setTime(startTime + 3L);
-                // (3, "three") is not put
-                context.setTime(startTime + 4L);
-                store.put(4, "four");
-                context.setTime(startTime + 5L);
-                store.put(5, "five");
-
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L)));
-                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L)));
-
-                context.setTime(startTime + 3L);
-                store.put(2, "two+1");
-                context.setTime(startTime + 4L);
-                store.put(2, "two+2");
-                context.setTime(startTime + 5L);
-                store.put(2, "two+3");
-                context.setTime(startTime + 6L);
-                store.put(2, "two+4");
-                context.setTime(startTime + 7L);
-                store.put(2, "two+5");
-                context.setTime(startTime + 8L);
-                store.put(2, "two+6");
-
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L - windowSize, startTime + 0L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
-                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L)));
-                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L)));
-                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L)));
-                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L)));
-                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L)));
-                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L)));
-                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L - windowSize, startTime + 13L)));
-
-                // Flush the store and verify all current entries were properly flushed ...
-                store.flush();
-
-                Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
-                assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
-                assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
-                assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
-                assertNull(entriesByKey.get(3));
-                assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
-                assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
-                assertNull(entriesByKey.get(6));
-
-            } finally {
-                store.close();
-            }
-
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    @Test
-    public void testPutAndFetchAfter() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
-            RecordCollector recordCollector = new RecordCollector(producer) {
-                @Override
-                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
-                                    keySerializer.serialize(record.topic(), record.key()),
-                                    valueSerializer.serialize(record.topic(), record.value()))
-                    );
-                }
-            };
-
-            MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
-                    recordCollector);
-
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
-            try {
-                long startTime = segmentSize - 4L;
-
-                context.setTime(startTime + 0L);
-                store.put(0, "zero");
-                context.setTime(startTime + 1L);
-                store.put(1, "one");
-                context.setTime(startTime + 2L);
-                store.put(2, "two");
-                context.setTime(startTime + 3L);
-                // (3, "three") is not put
-                context.setTime(startTime + 4L);
-                store.put(4, "four");
-                context.setTime(startTime + 5L);
-                store.put(5, "five");
-
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L, startTime + 0L + windowSize)));
-                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L, startTime + 1L + windowSize)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L, startTime + 3L + windowSize)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L, startTime + 4L + windowSize)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L, startTime + 5L + windowSize)));
-
-                context.setTime(startTime + 3L);
-                store.put(2, "two+1");
-                context.setTime(startTime + 4L);
-                store.put(2, "two+2");
-                context.setTime(startTime + 5L);
-                store.put(2, "two+3");
-                context.setTime(startTime + 6L);
-                store.put(2, "two+4");
-                context.setTime(startTime + 7L);
-                store.put(2, "two+5");
-                context.setTime(startTime + 8L);
-                store.put(2, "two+6");
-
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L, startTime - 2L + windowSize)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L, startTime - 1L + windowSize)));
-                assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime, startTime + windowSize)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L, startTime + 1L + windowSize)));
-                assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
-                assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L, startTime + 3L + windowSize)));
-                assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L, startTime + 4L + windowSize)));
-                assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L, startTime + 5L + windowSize)));
-                assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L, startTime + 6L + windowSize)));
-                assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L, startTime + 7L + windowSize)));
-                assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L, startTime + 8L + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L, startTime + 9L + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L, startTime + 10L + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L, startTime + 11L + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L, startTime + 12L + windowSize)));
-
-                // Flush the store and verify all current entries were properly flushed ...
-                store.flush();
-
-                Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
-                assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
-                assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
-                assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
-                assertNull(entriesByKey.get(3));
-                assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
-                assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
-                assertNull(entriesByKey.get(6));
-
-            } finally {
-                store.close();
-            }
-
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    @Test
-    public void testPutSameKeyTimestamp() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
-            RecordCollector recordCollector = new RecordCollector(producer) {
-                @Override
-                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
-                                    keySerializer.serialize(record.topic(), record.key()),
-                                    valueSerializer.serialize(record.topic(), record.value()))
-                    );
-                }
-            };
-
-            MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
-                    recordCollector);
-
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
-            try {
-                long startTime = segmentSize - 4L;
-
-                context.setTime(startTime);
-                store.put(0, "zero");
-
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
-
-                context.setTime(startTime);
-                store.put(0, "zero");
-                context.setTime(startTime);
-                store.put(0, "zero+");
-                context.setTime(startTime);
-                store.put(0, "zero++");
-
-                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
-                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize)));
-                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize)));
-                assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize)));
-
-                // Flush the store and verify all current entries were properly flushed ...
-                store.flush();
-
-                Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
-                assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
-
-            } finally {
-                store.close();
-            }
-
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    @Test
-    public void testRolling() throws IOException {
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
-            RecordCollector recordCollector = new RecordCollector(producer) {
-                @Override
-                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
-                                    keySerializer.serialize(record.topic(), record.key()),
-                                    valueSerializer.serialize(record.topic(), record.value()))
-                    );
-                }
-            };
-
-            MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
-                    recordCollector);
-
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
-            RocksDBWindowStore<Integer, String> inner =
-                    (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
-            try {
-                long startTime = segmentSize * 2;
-                long incr = segmentSize / 2;
-
-                context.setTime(startTime);
-                store.put(0, "zero");
-                assertEquals(Utils.mkSet(2L), inner.segmentIds());
-
-                context.setTime(startTime + incr);
-                store.put(1, "one");
-                assertEquals(Utils.mkSet(2L), inner.segmentIds());
-
-                context.setTime(startTime + incr * 2);
-                store.put(2, "two");
-                assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
-
-                context.setTime(startTime + incr * 3);
-                // (3, "three") is not put
-                assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
-
-                context.setTime(startTime + incr * 4);
-                store.put(4, "four");
-                assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
-
-                context.setTime(startTime + incr * 5);
-                store.put(5, "five");
-                assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
-
-                assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
-                assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-
-                context.setTime(startTime + incr * 6);
-                store.put(6, "six");
-                assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
-
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-
-
-                context.setTime(startTime + incr * 7);
-                store.put(7, "seven");
-                assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
-
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-                assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
-
-                context.setTime(startTime + incr * 8);
-                store.put(8, "eight");
-                assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
-
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
-                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
-
-                // check segment directories
-                store.flush();
-                assertEquals(
-                        Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)),
-                        segmentDirs(baseDir)
-                );
-            } finally {
-                store.close();
-            }
-
-        } finally {
-            Utils.delete(baseDir);
-        }
-    }
-
-    @Test
-    public void testRestore() throws IOException {
-        final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
-        long startTime = segmentSize * 2;
-        long incr = segmentSize / 2;
-
-        File baseDir = Files.createTempDirectory("test").toFile();
-        try {
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
-            RecordCollector recordCollector = new RecordCollector(producer) {
-                @Override
-                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
-                                    keySerializer.serialize(record.topic(), record.key()),
-                                    valueSerializer.serialize(record.topic(), record.value()))
-                    );
-                }
-            };
-
-            MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
-                    recordCollector);
-
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
-            try {
-                context.setTime(startTime);
-                store.put(0, "zero");
-                context.setTime(startTime + incr);
-                store.put(1, "one");
-                context.setTime(startTime + incr * 2);
-                store.put(2, "two");
-                context.setTime(startTime + incr * 3);
-                store.put(3, "three");
-                context.setTime(startTime + incr * 4);
-                store.put(4, "four");
-                context.setTime(startTime + incr * 5);
-                store.put(5, "five");
-                context.setTime(startTime + incr * 6);
-                store.put(6, "six");
-                context.setTime(startTime + incr * 7);
-                store.put(7, "seven");
-                context.setTime(startTime + incr * 8);
-                store.put(8, "eight");
-                store.flush();
-
-            } finally {
-                store.close();
-            }
-
-
-        } finally {
-            Utils.delete(baseDir);
-        }
-
-        File baseDir2 = Files.createTempDirectory("test").toFile();
-        try {
-            Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
-            RecordCollector recordCollector = new RecordCollector(producer) {
-                @Override
-                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                    changeLog.add(new Entry<>(
-                                    keySerializer.serialize(record.topic(), record.key()),
-                                    valueSerializer.serialize(record.topic(), record.value()))
-                    );
-                }
-            };
-
-            MockProcessorContext context = new MockProcessorContext(
-                    null, baseDir,
-                    byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
-                    recordCollector);
-
-            WindowStore<Integer, String> store = createWindowStore(context, serdes);
-            RocksDBWindowStore<Integer, String> inner =
-                    (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
-
-            try {
-                context.restore("window", changeLog);
-
-                assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
-
-                assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
-                assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
-                assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
-                assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-                assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-                assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
-                assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
-
-                // check segment directories
-                store.flush();
-                assertEquals(
-                        Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)),
-                        segmentDirs(baseDir)
-                );
-            } finally {
-                store.close();
-            }
-
-
-        } finally {
-            Utils.delete(baseDir2);
-        }
-    }
-
-    private <E> List<E> toList(WindowStoreIterator<E> iterator) {
-        ArrayList<E> list = new ArrayList<>();
-        while (iterator.hasNext()) {
-            list.add(iterator.next().value);
-        }
-        return list;
-    }
-
-    private Set<String> segmentDirs(File baseDir) {
-        File rocksDbDir = new File(baseDir, "rocksdb");
-        String[] subdirs = rocksDbDir.list();
-
-        HashSet<String> set = new HashSet<>();
-
-        for (String subdir : subdirs) {
-            if (subdir.startsWith("window-"))
-            set.add(subdir.substring(7));
-        }
-        return set;
-    }
-
-    private Map<Integer, Set<String>> entriesByKey(List<Entry<byte[], byte[]>> changeLog, long startTime) {
-        HashMap<Integer, Set<String>> entriesByKey = new HashMap<>();
-
-        for (Entry<byte[], byte[]> entry : changeLog) {
-            long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key());
-            Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key(), serdes);
-            String value = entry.value() == null ? null : serdes.valueFrom(entry.value());
-
-            Set<String> entries = entriesByKey.get(key);
-            if (entries == null) {
-                entries = new HashSet<>();
-                entriesByKey.put(key, entries);
-            }
-            entries.add(value + "@" + (timestamp - startTime));
-        }
-
-        return entriesByKey;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
new file mode 100644
index 0000000..2ed698c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
+import org.junit.Test;
+
+public abstract class AbstractKeyValueStoreTest {
+
+    protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context,
+                                                                      Class<K> keyClass, Class<V> valueClass,
+                                                                      boolean useContextSerdes);
+
+    @Test
+    public void testPutGetRange() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+        try {
+
+            // Verify that the store reads and writes correctly ...
+            store.put(0, "zero");
+            store.put(1, "one");
+            store.put(2, "two");
+            store.put(4, "four");
+            store.put(5, "five");
+            assertEquals(5, driver.sizeOf(store));
+            assertEquals("zero", store.get(0));
+            assertEquals("one", store.get(1));
+            assertEquals("two", store.get(2));
+            assertNull(store.get(3));
+            assertEquals("four", store.get(4));
+            assertEquals("five", store.get(5));
+            store.delete(5);
+
+            // Flush the store and verify all current entries were properly flushed ...
+            store.flush();
+            assertEquals("zero", driver.flushedEntryStored(0));
+            assertEquals("one", driver.flushedEntryStored(1));
+            assertEquals("two", driver.flushedEntryStored(2));
+            assertEquals("four", driver.flushedEntryStored(4));
+            assertEquals(null, driver.flushedEntryStored(5));
+
+            assertEquals(false, driver.flushedEntryRemoved(0));
+            assertEquals(false, driver.flushedEntryRemoved(1));
+            assertEquals(false, driver.flushedEntryRemoved(2));
+            assertEquals(false, driver.flushedEntryRemoved(4));
+            assertEquals(true, driver.flushedEntryRemoved(5));
+
+            // Check range iteration ...
+            try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
+                while (iter.hasNext()) {
+                    Entry<Integer, String> entry = iter.next();
+                    if (entry.key().equals(2))
+                        assertEquals("two", entry.value());
+                    else if (entry.key().equals(4))
+                        assertEquals("four", entry.value());
+                    else
+                        fail("Unexpected entry: " + entry);
+                }
+            }
+
+            // Check range iteration ...
+            try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
+                while (iter.hasNext()) {
+                    Entry<Integer, String> entry = iter.next();
+                    if (entry.key().equals(2))
+                        assertEquals("two", entry.value());
+                    else if (entry.key().equals(4))
+                        assertEquals("four", entry.value());
+                    else
+                        fail("Unexpected entry: " + entry);
+                }
+            }
+        } finally {
+            store.close();
+        }
+    }
+
+    @Test
+    public void testPutGetRangeWithDefaultSerdes() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        try {
+
+            // Verify that the store reads and writes correctly ...
+            store.put(0, "zero");
+            store.put(1, "one");
+            store.put(2, "two");
+            store.put(4, "four");
+            store.put(5, "five");
+            assertEquals(5, driver.sizeOf(store));
+            assertEquals("zero", store.get(0));
+            assertEquals("one", store.get(1));
+            assertEquals("two", store.get(2));
+            assertNull(store.get(3));
+            assertEquals("four", store.get(4));
+            assertEquals("five", store.get(5));
+            store.delete(5);
+
+            // Flush the store and verify all current entries were properly flushed ...
+            store.flush();
+            assertEquals("zero", driver.flushedEntryStored(0));
+            assertEquals("one", driver.flushedEntryStored(1));
+            assertEquals("two", driver.flushedEntryStored(2));
+            assertEquals("four", driver.flushedEntryStored(4));
+            assertEquals(null, driver.flushedEntryStored(5));
+
+            assertEquals(false, driver.flushedEntryRemoved(0));
+            assertEquals(false, driver.flushedEntryRemoved(1));
+            assertEquals(false, driver.flushedEntryRemoved(2));
+            assertEquals(false, driver.flushedEntryRemoved(4));
+            assertEquals(true, driver.flushedEntryRemoved(5));
+        } finally {
+            store.close();
+        }
+    }
+
+    @Test
+    public void testRestore() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+
+        // Add any entries that will be restored to any store
+        // that uses the driver's context ...
+        driver.addEntryToRestoreLog(0, "zero");
+        driver.addEntryToRestoreLog(1, "one");
+        driver.addEntryToRestoreLog(2, "two");
+        driver.addEntryToRestoreLog(4, "four");
+
+        // Create the store, which should register with the context and automatically
+        // receive the restore entries ...
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+        try {
+            // Verify that the store's contents were properly restored ...
+            assertEquals(0, driver.checkForRestoredEntries(store));
+
+            // and there are no other entries ...
+            assertEquals(4, driver.sizeOf(store));
+        } finally {
+            store.close();
+        }
+    }
+
+    @Test
+    public void testRestoreWithDefaultSerdes() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+
+        // Add any entries that will be restored to any store
+        // that uses the driver's context ...
+        driver.addEntryToRestoreLog(0, "zero");
+        driver.addEntryToRestoreLog(1, "one");
+        driver.addEntryToRestoreLog(2, "two");
+        driver.addEntryToRestoreLog(4, "four");
+
+        // Create the store, which should register with the context and automatically
+        // receive the restore entries ...
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        try {
+            // Verify that the store's contents were properly restored ...
+            assertEquals(0, driver.checkForRestoredEntries(store));
+
+            // and there are no other entries ...
+            assertEquals(4, driver.sizeOf(store));
+        } finally {
+            store.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
new file mode 100644
index 0000000..2b0927e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+
+public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
+            ProcessorContext context,
+            Class<K> keyClass, Class<V> valueClass,
+            boolean useContextSerdes) {
+
+        StateStoreSupplier supplier;
+        if (useContextSerdes) {
+            Serializer<K> keySer = (Serializer<K>) context.keySerializer();
+            Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
+            Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
+            Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
+            supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
+        } else {
+            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build();
+        }
+
+        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+        store.init(context);
+        return store;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
new file mode 100644
index 0000000..d7cc5b9
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+public class InMemoryLRUCacheStoreTest {
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testPutGetRange() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
+        StateStoreSupplier supplier = Stores.create("my-store")
+                                                     .withIntegerKeys().withStringValues()
+                                                     .inMemory().maxEntries(3)
+                                                     .build();
+        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+        store.init(driver.context());
+
+        // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(2, "two");
+        store.put(3, "three");
+        store.put(4, "four");
+        store.put(5, "five");
+
+        // It should only keep the last 4 added ...
+        assertEquals(3, driver.sizeOf(store));
+        assertNull(store.get(0));
+        assertNull(store.get(1));
+        assertNull(store.get(2));
+        assertEquals("three", store.get(3));
+        assertEquals("four", store.get(4));
+        assertEquals("five", store.get(5));
+        store.delete(5);
+
+        // Flush the store and verify all current entries were properly flushed ...
+        store.flush();
+        assertNull(driver.flushedEntryStored(0));
+        assertNull(driver.flushedEntryStored(1));
+        assertNull(driver.flushedEntryStored(2));
+        assertEquals("three", driver.flushedEntryStored(3));
+        assertEquals("four", driver.flushedEntryStored(4));
+        assertNull(driver.flushedEntryStored(5));
+
+        assertEquals(true, driver.flushedEntryRemoved(0));
+        assertEquals(true, driver.flushedEntryRemoved(1));
+        assertEquals(true, driver.flushedEntryRemoved(2));
+        assertEquals(false, driver.flushedEntryRemoved(3));
+        assertEquals(false, driver.flushedEntryRemoved(4));
+        assertEquals(true, driver.flushedEntryRemoved(5));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testPutGetRangeWithDefaultSerdes() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
+
+        Serializer<Integer> keySer = (Serializer<Integer>) driver.context().keySerializer();
+        Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
+        Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
+        Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
+        StateStoreSupplier supplier = Stores.create("my-store")
+                                                     .withKeys(keySer, keyDeser)
+                                                     .withValues(valSer, valDeser)
+                                                     .inMemory().maxEntries(3)
+                                                     .build();
+        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+        store.init(driver.context());
+
+        // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(2, "two");
+        store.put(3, "three");
+        store.put(4, "four");
+        store.put(5, "five");
+
+        // It should only keep the last 4 added ...
+        assertEquals(3, driver.sizeOf(store));
+        assertNull(store.get(0));
+        assertNull(store.get(1));
+        assertNull(store.get(2));
+        assertEquals("three", store.get(3));
+        assertEquals("four", store.get(4));
+        assertEquals("five", store.get(5));
+        store.delete(5);
+
+        // Flush the store and verify all current entries were properly flushed ...
+        store.flush();
+        assertNull(driver.flushedEntryStored(0));
+        assertNull(driver.flushedEntryStored(1));
+        assertNull(driver.flushedEntryStored(2));
+        assertEquals("three", driver.flushedEntryStored(3));
+        assertEquals("four", driver.flushedEntryStored(4));
+        assertNull(driver.flushedEntryStored(5));
+
+        assertEquals(true, driver.flushedEntryRemoved(0));
+        assertEquals(true, driver.flushedEntryRemoved(1));
+        assertEquals(true, driver.flushedEntryRemoved(2));
+        assertEquals(false, driver.flushedEntryRemoved(3));
+        assertEquals(false, driver.flushedEntryRemoved(4));
+        assertEquals(true, driver.flushedEntryRemoved(5));
+    }
+
+    @Test
+    public void testRestore() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+
+        // Add any entries that will be restored to any store
+        // that uses the driver's context ...
+        driver.addEntryToRestoreLog(1, "one");
+        driver.addEntryToRestoreLog(2, "two");
+        driver.addEntryToRestoreLog(4, "four");
+
+        // Create the store, which should register with the context and automatically
+        // receive the restore entries ...
+        StateStoreSupplier supplier = Stores.create("my-store")
+                                                     .withIntegerKeys().withStringValues()
+                                                     .inMemory().maxEntries(3)
+                                                     .build();
+        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+        store.init(driver.context());
+
+        // Verify that the store's contents were properly restored ...
+        assertEquals(0, driver.checkForRestoredEntries(store));
+
+        // and there are no other entries ...
+        assertEquals(3, driver.sizeOf(store));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
new file mode 100644
index 0000000..29a3c0a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+
+public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
+            ProcessorContext context,
+            Class<K> keyClass,
+            Class<V> valueClass,
+            boolean useContextSerdes) {
+
+        StateStoreSupplier supplier;
+        if (useContextSerdes) {
+            Serializer<K> keySer = (Serializer<K>) context.keySerializer();
+            Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
+            Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
+            Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
+            supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
+        } else {
+            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).localDatabase().build();
+        }
+
+        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+        store.init(context);
+        return store;
+
+    }
+}


Mime
View raw message