kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [1/2] kafka git commit: KAFKA-3060: Refactor MeteredStore and RockDBStore Impl
Date Tue, 02 Feb 2016 00:11:27 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 66ecf3f08 -> 57da044a9


http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index d854c92..afb0f09 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -22,11 +22,13 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.Serdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.WindowStoreUtil;
+import org.apache.kafka.streams.state.WindowStoreUtils;
+
 
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -46,7 +48,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         public final long id;
 
         Segment(String name, long id) {
-            super(name, WindowStoreUtil.INNER_SERDES);
+            super(name, WindowStoreUtils.INNER_SERDES);
             this.id = id;
         }
 
@@ -61,7 +63,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         private int index = 0;
 
         RocksDBWindowStoreIterator(Serdes<?, V> serdes) {
-            this(serdes, WindowStoreUtil.NO_ITERATORS);
+            this(serdes, WindowStoreUtils.NO_ITERATORS);
         }
 
         RocksDBWindowStoreIterator(Serdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) {
@@ -87,7 +89,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
             KeyValue<byte[], byte[]> kv = iterators[index].next();
 
-            return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(kv.key),
+            return new KeyValue<>(WindowStoreUtils.timestampFromBinaryKey(kv.key),
                                   serdes.valueFrom(kv.value));
         }
 
@@ -111,10 +113,14 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     private final Segment[] segments;
     private final Serdes<K, V> serdes;
     private final SimpleDateFormat formatter;
+    private final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
 
     private ProcessorContext context;
-    private long currentSegmentId = -1L;
     private int seqnum = 0;
+    private long currentSegmentId = -1L;
+
+    private boolean loggingEnabled = false;
+    private StoreChangeLogger<byte[], byte[]> changeLogger = null;
 
     public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes) {
         this.name = name;
@@ -127,11 +133,23 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
         this.retainDuplicates = retainDuplicates;
 
+        this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
+            public byte[] get(byte[] key) {
+                return getInternal(key);
+            }
+        };
+
         // Create a date formatter. Formatted timestamps are used as segment name suffixes
         this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
         this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT"));
     }
 
+    public RocksDBWindowStore<K, V> enableLogging() {
+        loggingEnabled = true;
+
+        return this;
+    }
+
     @Override
     public String name() {
         return name;
@@ -140,6 +158,17 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     @Override
     public void init(ProcessorContext context) {
         this.context = context;
+
+        this.changeLogger = this.loggingEnabled ?
+                new RawStoreChangeLogger(name, context) : null;
+
+        // register and possibly restore the state from the logs
+        context.register(this, loggingEnabled, new StateRestoreCallback() {
+            @Override
+            public void restore(byte[] key, byte[] value) {
+                putInternal(key, value);
+            }
+        });
     }
 
     @Override
@@ -153,6 +182,9 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
             if (segment != null)
                 segment.flush();
         }
+
+        if (loggingEnabled)
+            changeLogger.logChange(this.getter);
     }
 
     @Override
@@ -165,16 +197,25 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     @Override
     public void put(K key, V value) {
-        putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP);
+        byte[] rawKey = putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP);
+
+        if (loggingEnabled) {
+            changeLogger.add(rawKey);
+            changeLogger.maybeLogChange(this.getter);
+        }
     }
 
     @Override
     public void put(K key, V value, long timestamp) {
-        putAndReturnInternalKey(key, value, timestamp);
+        byte[] rawKey = putAndReturnInternalKey(key, value, timestamp);
+
+        if (loggingEnabled) {
+            changeLogger.add(rawKey);
+            changeLogger.maybeLogChange(this.getter);
+        }
     }
 
-    @Override
-    public byte[] putAndReturnInternalKey(K key, V value, long t) {
+    private byte[] putAndReturnInternalKey(K key, V value, long t) {
         long timestamp = t == USE_CURRENT_TIMESTAMP ? context.timestamp() : t;
 
         long segmentId = segmentId(timestamp);
@@ -189,7 +230,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         if (segmentId > currentSegmentId - segments.length) {
             if (retainDuplicates)
                 seqnum = (seqnum + 1) & 0x7FFFFFFF;
-            byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes);
+            byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes);
             getSegment(segmentId).put(binaryKey, serdes.rawValue(value));
             return binaryKey;
         } else {
@@ -197,9 +238,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         }
     }
 
-    @Override
-    public void putInternal(byte[] binaryKey, byte[] binaryValue) {
-        long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
+    private void putInternal(byte[] binaryKey, byte[] binaryValue) {
+        long segmentId = segmentId(WindowStoreUtils.timestampFromBinaryKey(binaryKey));
 
         if (segmentId > currentSegmentId) {
             // A new segment will be created. Clean up old segments first.
@@ -212,9 +252,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
             getSegment(segmentId).put(binaryKey, binaryValue);
     }
 
-    @Override
-    public byte[] getInternal(byte[] binaryKey) {
-        long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
+    private byte[] getInternal(byte[] binaryKey) {
+        long segmentId = segmentId(WindowStoreUtils.timestampFromBinaryKey(binaryKey));
 
         Segment segment = segments[(int) (segmentId % segments.length)];
 
@@ -231,8 +270,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         long segFrom = segmentId(timeFrom);
         long segTo = segmentId(Math.max(0L, timeTo));
 
-        byte[] binaryFrom = WindowStoreUtil.toBinaryKey(key, timeFrom, 0, serdes);
-        byte[] binaryUntil = WindowStoreUtil.toBinaryKey(key, timeTo + 1L, 0, serdes);
+        byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
+        byte[] binaryUntil = WindowStoreUtils.toBinaryKey(key, timeTo + 1L, 0, serdes);
 
         ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>();
 
@@ -271,15 +310,16 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         }
     }
 
-    public long segmentId(long timestamp) {
+    private long segmentId(long timestamp) {
         return timestamp / segmentInterval;
     }
 
+    // this method is defined public since it is used for unit tests
     public String directorySuffix(long segmentId) {
         return formatter.format(new Date(segmentId * segmentInterval));
     }
 
-    // this method is used by a test
+    // this method is defined public since it is used for unit tests
     public Set<Long> segmentIds() {
         HashSet<Long> segmentIds = new HashSet<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index fa85ce9..6823e6d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -36,7 +36,7 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
     private final long retentionPeriod;
     private final boolean retainDuplicates;
     private final int numSegments;
-    private final Serdes serdes;
+    private final Serdes<K, V> serdes;
     private final Time time;
 
     public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) {
@@ -53,7 +53,7 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, retentionPeriod, numSegments, retainDuplicates, serdes), "rocksdb-window", time);
+        return new MeteredWindowStore<>(new RocksDBWindowStore<>(name, retentionPeriod, numSegments, retainDuplicates, serdes).enableLogging(), "rocksdb-window", time);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index da5544c..b330334 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -32,28 +32,41 @@ public class StoreChangeLogger<K, V> {
         V get(K key);
     }
 
+    // TODO: these values should be configurable
+    protected static final int DEFAULT_WRITE_BATCH_SIZE = 100;
+
     protected final Serdes<K, V> serialization;
 
-    private final Set<K> dirty;
-    private final Set<K> removed;
+    private final String topic;
+    private final int partition;
+    private final ProcessorContext context;
     private final int maxDirty;
     private final int maxRemoved;
 
-    private final String topic;
-    private int partition;
-    private ProcessorContext context;
+    protected Set<K> dirty;
+    protected Set<K> removed;
 
-    // always wrap the logged store with the metered store
     public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
+        this(topic, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
+    }
+
+    public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
+        this(topic, context, context.id().partition, serialization, maxDirty, maxRemoved);
+        init();
+    }
+
+    protected StoreChangeLogger(String topic, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
         this.topic = topic;
-        this.serialization = serialization;
         this.context = context;
-        this.partition = context.id().partition;
+        this.partition = partition;
+        this.serialization = serialization;
+        this.maxDirty = maxDirty;
+        this.maxRemoved = maxRemoved;
+    }
 
+    public void init() {
         this.dirty = new HashSet<>();
         this.removed = new HashSet<>();
-        this.maxDirty = 100; // TODO: this needs to be configurable
-        this.maxRemoved = 100; // TODO: this needs to be configurable
     }
 
     public void add(K key) {
@@ -89,4 +102,18 @@ public class StoreChangeLogger<K, V> {
         }
     }
 
+    public void clear() {
+        this.removed.clear();
+        this.dirty.clear();
+    }
+
+    // this is for test only
+    public int numDirty() {
+        return this.dirty.size();
+    }
+
+    // this is for test only
+    public int numRemoved() {
+        return this.removed.size();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index cb6ea05..40cce93 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -37,7 +37,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateUtils;
+import org.apache.kafka.streams.state.StateTestUtils;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.ProcessorTopologyTestDriver;
@@ -65,7 +65,7 @@ public class ProcessorTopologyTest {
     @Before
     public void setup() {
         // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
-        File localState = StateUtils.tempDir();
+        File localState = StateTestUtils.tempDir();
         Properties props = new Properties();
         props.setProperty(StreamsConfig.JOB_ID_CONFIG, "processor-topology-test");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 1e9c3ba..daa7201 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -219,7 +219,6 @@ public class KeyValueStoreTestDriver<K, V> {
         return new KeyValueStoreTestDriver<K, V>(serdes);
     }
 
-    private final Serdes<K, V> serdes;
     private final Map<K, V> flushedEntries = new HashMap<>();
     private final Set<K> flushedRemovals = new HashSet<>();
     private final List<KeyValue<K, V>> restorableEntries = new LinkedList<>();
@@ -238,22 +237,40 @@ public class KeyValueStoreTestDriver<K, V> {
     private final RecordCollector recordCollector;
     private File stateDir = null;
 
-    protected KeyValueStoreTestDriver(Serdes<K, V> serdes) {
-        this.serdes = serdes;
+    protected KeyValueStoreTestDriver(final Serdes<K, V> serdes) {
         ByteArraySerializer rawSerializer = new ByteArraySerializer();
-        Producer<byte[], byte[]> producer = new MockProducer<byte[], byte[]>(true, rawSerializer, rawSerializer);
+        Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
+
         this.recordCollector = new RecordCollector(producer) {
+            @SuppressWarnings("unchecked")
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
-                recordFlushed(record.key(), record.value());
+                // for byte arrays we need to wrap it for comparison
+
+                K key;
+                if (record.key() instanceof byte[]) {
+                    key = serdes.keyFrom((byte[]) record.key());
+                } else {
+                    key = (K) record.key();
+                }
+
+                V value;
+                if (record.key() instanceof byte[]) {
+                    value = serdes.valueFrom((byte[]) record.value());
+                } else {
+                    value = (V) record.value();
+                }
+
+                recordFlushed(key, value);
             }
             @Override
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer,
                                     StreamPartitioner<K1, V1> partitioner) {
-                recordFlushed(record.key(), record.value());
+                // ignore partitioner
+                send(record, keySerializer, valueSerializer);
             }
         };
-        this.stateDir = StateUtils.tempDir();
+        this.stateDir = StateTestUtils.tempDir();
         this.stateDir.mkdirs();
 
         Properties props = new Properties();
@@ -279,7 +296,7 @@ public class KeyValueStoreTestDriver<K, V> {
             @Override
             public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback func) {
                 storeMap.put(store.name(), store);
-                restoreEntries(func);
+                restoreEntries(func, serdes);
             }
 
             @Override
@@ -299,21 +316,19 @@ public class KeyValueStoreTestDriver<K, V> {
         };
     }
 
-    @SuppressWarnings("unchecked")
-    protected <K1, V1> void recordFlushed(K1 key, V1 value) {
-        K k = (K) key;
+    protected void recordFlushed(K key, V value) {
         if (value == null) {
             // This is a removal ...
-            flushedRemovals.add(k);
-            flushedEntries.remove(k);
+            flushedRemovals.add(key);
+            flushedEntries.remove(key);
         } else {
             // This is a normal add
-            flushedEntries.put(k, (V) value);
-            flushedRemovals.remove(k);
+            flushedEntries.put(key, value);
+            flushedRemovals.remove(key);
         }
     }
 
-    private void restoreEntries(StateRestoreCallback func) {
+    private void restoreEntries(StateRestoreCallback func, Serdes<K, V> serdes) {
         for (KeyValue<K, V> entry : restorableEntries) {
             if (entry != null) {
                 byte[] rawKey = serdes.rawKey(entry.key);
@@ -440,6 +455,13 @@ public class KeyValueStoreTestDriver<K, V> {
     }
 
     /**
+     * Return number of removed entry
+     */
+    public int numFlushedEntryRemoved() {
+        return flushedRemovals.size();
+    }
+
+    /**
      * Remove all {@link #flushedEntryStored(Object) flushed entries}, {@link #flushedEntryRemoved(Object) flushed removals},
      */
     public void clear() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java
new file mode 100644
index 0000000..70e6cf6
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StateTestUtils.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A utility for tests to create and manage unique and isolated directories on the file system for local state.
+ */
+public class StateTestUtils {
+
+    private static final AtomicLong INSTANCE_COUNTER = new AtomicLong();
+
+    /**
+     * Create a new temporary directory that will be cleaned up automatically upon shutdown.
+     * @return the new directory that will exist; never null
+     */
+    public static File tempDir() {
+        try {
+            final File dir = Files.createTempDirectory("test").toFile();
+            dir.mkdirs();
+            dir.deleteOnExit();
+
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    deleteDirectory(dir);
+                }
+            });
+            return dir;
+        } catch (IOException ex) {
+            throw new RuntimeException("Failed to create a temp dir", ex);
+        }
+    }
+
+    private static void deleteDirectory(File dir) {
+        if (dir != null && dir.exists()) {
+            try {
+                Files.walkFileTree(dir.toPath(), new SimpleFileVisitor<Path>() {
+                    @Override
+                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+                        Files.delete(file);
+                        return FileVisitResult.CONTINUE;
+                    }
+
+                    @Override
+                    public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+                        Files.delete(dir);
+                        return FileVisitResult.CONTINUE;
+                    }
+
+                });
+            } catch (IOException e) {
+                // do nothing
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
deleted file mode 100644
index c014ae5..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * A utility for tests to create and manage unique and isolated directories on the file system for local state.
- */
-public class StateUtils {
-
-    private static final AtomicLong INSTANCE_COUNTER = new AtomicLong();
-
-    /**
-     * Create a new temporary directory that will be cleaned up automatically upon shutdown.
-     * @return the new directory that will exist; never null
-     */
-    public static File tempDir() {
-        try {
-            final File dir = Files.createTempDirectory("test").toFile();
-            dir.mkdirs();
-            dir.deleteOnExit();
-
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-                @Override
-                public void run() {
-                    deleteDirectory(dir);
-                }
-            });
-            return dir;
-        } catch (IOException ex) {
-            throw new RuntimeException("Failed to create a temp dir", ex);
-        }
-    }
-
-    private static void deleteDirectory(File dir) {
-        if (dir != null && dir.exists()) {
-            try {
-                Files.walkFileTree(dir.toPath(), new SimpleFileVisitor<Path>() {
-                    @Override
-                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
-                        Files.delete(file);
-                        return FileVisitResult.CONTINUE;
-                    }
-
-                    @Override
-                    public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
-                        Files.delete(dir);
-                        return FileVisitResult.CONTINUE;
-                    }
-
-                });
-            } catch (IOException e) {
-                // do nothing
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 8effd77..ee343e8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -37,7 +37,7 @@ public abstract class AbstractKeyValueStoreTest {
     @Test
     public void testPutGetRange() {
         // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
         KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
         try {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index d7cc5b9..10f31d6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -16,144 +16,95 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.Test;
 
-public class InMemoryLRUCacheStoreTest {
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testPutGetRange() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-        StateStoreSupplier supplier = Stores.create("my-store")
-                                                     .withIntegerKeys().withStringValues()
-                                                     .inMemory().maxEntries(3)
-                                                     .build();
-        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
-        store.init(driver.context());
-
-        // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
-        store.put(0, "zero");
-        store.put(1, "one");
-        store.put(2, "two");
-        store.put(3, "three");
-        store.put(4, "four");
-        store.put(5, "five");
-
-        // It should only keep the last 4 added ...
-        assertEquals(3, driver.sizeOf(store));
-        assertNull(store.get(0));
-        assertNull(store.get(1));
-        assertNull(store.get(2));
-        assertEquals("three", store.get(3));
-        assertEquals("four", store.get(4));
-        assertEquals("five", store.get(5));
-        store.delete(5);
-
-        // Flush the store and verify all current entries were properly flushed ...
-        store.flush();
-        assertNull(driver.flushedEntryStored(0));
-        assertNull(driver.flushedEntryStored(1));
-        assertNull(driver.flushedEntryStored(2));
-        assertEquals("three", driver.flushedEntryStored(3));
-        assertEquals("four", driver.flushedEntryStored(4));
-        assertNull(driver.flushedEntryStored(5));
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-        assertEquals(true, driver.flushedEntryRemoved(0));
-        assertEquals(true, driver.flushedEntryRemoved(1));
-        assertEquals(true, driver.flushedEntryRemoved(2));
-        assertEquals(false, driver.flushedEntryRemoved(3));
-        assertEquals(false, driver.flushedEntryRemoved(4));
-        assertEquals(true, driver.flushedEntryRemoved(5));
-    }
+public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
 
     @SuppressWarnings("unchecked")
-    @Test
-    public void testPutGetRangeWithDefaultSerdes() {
-        // Create the test driver ...
-        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-
-        Serializer<Integer> keySer = (Serializer<Integer>) driver.context().keySerializer();
-        Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
-        Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
-        Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
-        StateStoreSupplier supplier = Stores.create("my-store")
-                                                     .withKeys(keySer, keyDeser)
-                                                     .withValues(valSer, valDeser)
-                                                     .inMemory().maxEntries(3)
-                                                     .build();
-        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
-        store.init(driver.context());
-
-        // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
-        store.put(0, "zero");
-        store.put(1, "one");
-        store.put(2, "two");
-        store.put(3, "three");
-        store.put(4, "four");
-        store.put(5, "five");
-
-        // It should only keep the last 4 added ...
-        assertEquals(3, driver.sizeOf(store));
-        assertNull(store.get(0));
-        assertNull(store.get(1));
-        assertNull(store.get(2));
-        assertEquals("three", store.get(3));
-        assertEquals("four", store.get(4));
-        assertEquals("five", store.get(5));
-        store.delete(5);
-
-        // Flush the store and verify all current entries were properly flushed ...
-        store.flush();
-        assertNull(driver.flushedEntryStored(0));
-        assertNull(driver.flushedEntryStored(1));
-        assertNull(driver.flushedEntryStored(2));
-        assertEquals("three", driver.flushedEntryStored(3));
-        assertEquals("four", driver.flushedEntryStored(4));
-        assertNull(driver.flushedEntryStored(5));
-
-        assertEquals(true, driver.flushedEntryRemoved(0));
-        assertEquals(true, driver.flushedEntryRemoved(1));
-        assertEquals(true, driver.flushedEntryRemoved(2));
-        assertEquals(false, driver.flushedEntryRemoved(3));
-        assertEquals(false, driver.flushedEntryRemoved(4));
-        assertEquals(true, driver.flushedEntryRemoved(5));
+    @Override
+    protected <K, V> KeyValueStore<K, V> createKeyValueStore(
+            ProcessorContext context,
+            Class<K> keyClass,
+            Class<V> valueClass,
+            boolean useContextSerdes) {
+
+        StateStoreSupplier supplier;
+        if (useContextSerdes) {
+            Serializer<K> keySer = (Serializer<K>) context.keySerializer();
+            Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
+            Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
+            Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
+            supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().maxEntries(10).build();
+        } else {
+            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().maxEntries(10).build();
+        }
+
+        KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+        store.init(context);
+        return store;
     }
 
     @Test
-    public void testRestore() {
+    public void testEvict() {
         // Create the test driver ...
         KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
-        // Add any entries that will be restored to any store
-        // that uses the driver's context ...
-        driver.addEntryToRestoreLog(1, "one");
-        driver.addEntryToRestoreLog(2, "two");
-        driver.addEntryToRestoreLog(4, "four");
-
-        // Create the store, which should register with the context and automatically
-        // receive the restore entries ...
-        StateStoreSupplier supplier = Stores.create("my-store")
-                                                     .withIntegerKeys().withStringValues()
-                                                     .inMemory().maxEntries(3)
-                                                     .build();
-        KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
-        store.init(driver.context());
-
-        // Verify that the store's contents were properly restored ...
-        assertEquals(0, driver.checkForRestoredEntries(store));
-
-        // and there are no other entries ...
-        assertEquals(3, driver.sizeOf(store));
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+
+        try {
+            store.put(0, "zero");
+            store.put(1, "one");
+            store.put(2, "two");
+            store.put(3, "three");
+            store.put(4, "four");
+            store.put(5, "five");
+            store.put(6, "six");
+            store.put(7, "seven");
+            store.put(8, "eight");
+            store.put(9, "nine");
+            assertEquals(10, driver.sizeOf(store));
+
+            store.put(10, "ten");
+            store.flush();
+            assertEquals(10, driver.sizeOf(store));
+            assertTrue(driver.flushedEntryRemoved(0));
+            assertEquals(1, driver.numFlushedEntryRemoved());
+
+            store.delete(1);
+            store.flush();
+            assertEquals(9, driver.sizeOf(store));
+            assertTrue(driver.flushedEntryRemoved(0));
+            assertTrue(driver.flushedEntryRemoved(1));
+            assertEquals(2, driver.numFlushedEntryRemoved());
+
+            store.put(11, "eleven");
+            store.flush();
+            assertEquals(10, driver.sizeOf(store));
+            assertEquals(2, driver.numFlushedEntryRemoved());
+
+            store.put(2, "two-again");
+            store.flush();
+            assertEquals(10, driver.sizeOf(store));
+            assertEquals(2, driver.numFlushedEntryRemoved());
+
+            store.put(12, "twelve");
+            store.flush();
+            assertEquals(10, driver.sizeOf(store));
+            assertTrue(driver.flushedEntryRemoved(0));
+            assertTrue(driver.flushedEntryRemoved(1));
+            assertTrue(driver.flushedEntryRemoved(2));
+            assertEquals(3, driver.numFlushedEntryRemoved());
+        } finally {
+            store.close();
+        }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
new file mode 100644
index 0000000..352e330
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class OffsetCheckpointTest {
+
+    private final String topic = "topic";
+
+    private OffsetCheckpoint checkpoint = null;
+
+    @Test
+    public void testReadWrite() throws IOException {
+        File f = new File("/tmp/kafka-streams/offset_checkpoint.test");
+        checkpoint = new OffsetCheckpoint(f);
+
+        try {
+            Map<TopicPartition, Long> offsets = new HashMap<>();
+            offsets.put(new TopicPartition(topic, 0), 0L);
+            offsets.put(new TopicPartition(topic, 1), 1L);
+            offsets.put(new TopicPartition(topic, 2), 2L);
+
+            checkpoint.write(offsets);
+            assertEquals(offsets, checkpoint.read());
+
+            checkpoint.delete();
+            assertFalse(f.exists());
+
+            offsets.put(new TopicPartition(topic, 3), 3L);
+            checkpoint.write(offsets);
+            assertEquals(offsets, checkpoint.read());
+        } finally {
+            checkpoint.delete();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index 29a3c0a..b9703db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -39,9 +39,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
             Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
             Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
             Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
-            supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
+            supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).persistent().build();
         } else {
-            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).localDatabase().build();
+            supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).persistent().build();
         }
 
         KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 45448e5..94385c8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.state.Serdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.WindowStoreUtil;
+import org.apache.kafka.streams.state.WindowStoreUtils;
 import org.apache.kafka.test.MockProcessorContext;
 import org.junit.Test;
 
@@ -58,8 +58,10 @@ public class RocksDBWindowStoreTest {
     private final long windowSize = 3;
     private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", Integer.class, String.class);
 
+    @SuppressWarnings("unchecked")
     protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, Serdes<K, V> serdes) {
         StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, serdes, null);
+
         WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
         store.init(context);
         return store;
@@ -659,8 +661,8 @@ public class RocksDBWindowStoreTest {
         HashMap<Integer, Set<String>> entriesByKey = new HashMap<>();
 
         for (KeyValue<byte[], byte[]> entry : changeLog) {
-            long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key);
-            Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key, serdes);
+            long timestamp = WindowStoreUtils.timestampFromBinaryKey(entry.key);
+            Integer key = WindowStoreUtils.keyFromBinaryKey(entry.key, serdes);
             String value = entry.value == null ? null : serdes.valueFrom(entry.value);
 
             Set<String> entries = entriesByKey.get(key);

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
new file mode 100644
index 0000000..5f014ef
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.test.MockProcessorContext;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class StoreChangeLoggerTest {
+
+    private final String topic = "topic";
+
+    private final Map<Integer, String> logged = new HashMap<>();
+    private final Map<Integer, String> written = new HashMap<>();
+
+    private final ProcessorContext context = new MockProcessorContext(Serdes.withBuiltinTypes(topic, Integer.class, String.class),
+            new RecordCollector(null) {
+                @SuppressWarnings("unchecked")
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+                    logged.put((Integer) record.key(), (String) record.value());
+                }
+
+                @Override
+                public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer,
+                                          StreamPartitioner<K1, V1> partitioner) {
+                    // ignore partitioner
+                    send(record, keySerializer, valueSerializer);
+                }
+            }
+    );
+
+    private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, Serdes.withBuiltinTypes(topic, Integer.class, String.class), 3, 3);
+
+    private final StoreChangeLogger<byte[], byte[]> rawChangeLogger = new RawStoreChangeLogger(topic, context, 3, 3);
+
+    private final StoreChangeLogger.ValueGetter<Integer, String> getter = new StoreChangeLogger.ValueGetter<Integer, String>() {
+        @Override
+        public String get(Integer key) {
+            return written.get(key);
+        }
+    };
+
+    private final StoreChangeLogger.ValueGetter<byte[], byte[]> rawGetter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
+        private IntegerDeserializer deserializer = new IntegerDeserializer();
+        private StringSerializer serializer = new StringSerializer();
+
+        @Override
+        public byte[] get(byte[] key) {
+            return serializer.serialize(topic, written.get(deserializer.deserialize(topic, key)));
+        }
+    };
+
+    @Test
+    public void testAddRemove() {
+        written.put(0, "zero");
+        changeLogger.add(0);
+        written.put(1, "one");
+        changeLogger.add(1);
+        written.put(2, "two");
+        changeLogger.add(2);
+        assertEquals(3, changeLogger.numDirty());
+        assertEquals(0, changeLogger.numRemoved());
+
+        changeLogger.delete(0);
+        changeLogger.delete(1);
+        written.put(3, "three");
+        changeLogger.add(3);
+        assertEquals(2, changeLogger.numDirty());
+        assertEquals(2, changeLogger.numRemoved());
+
+        written.put(0, "zero-again");
+        changeLogger.add(0);
+        assertEquals(3, changeLogger.numDirty());
+        assertEquals(1, changeLogger.numRemoved());
+
+        written.put(4, "four");
+        changeLogger.add(4);
+        changeLogger.maybeLogChange(getter);
+        assertEquals(0, changeLogger.numDirty());
+        assertEquals(0, changeLogger.numRemoved());
+        assertEquals(5, logged.size());
+        assertEquals("zero-again", logged.get(0));
+        assertEquals(null, logged.get(1));
+        assertEquals("two", logged.get(2));
+        assertEquals("three", logged.get(3));
+        assertEquals("four", logged.get(4));
+    }
+
+    @Test
+    public void testRaw() {
+        IntegerSerializer serializer = new IntegerSerializer();
+
+        written.put(0, "zero");
+        rawChangeLogger.add(serializer.serialize(topic, 0));
+        written.put(1, "one");
+        rawChangeLogger.add(serializer.serialize(topic, 1));
+        written.put(2, "two");
+        rawChangeLogger.add(serializer.serialize(topic, 2));
+        assertEquals(3, rawChangeLogger.numDirty());
+        assertEquals(0, rawChangeLogger.numRemoved());
+
+        rawChangeLogger.delete(serializer.serialize(topic, 0));
+        rawChangeLogger.delete(serializer.serialize(topic, 1));
+        written.put(3, "three");
+        rawChangeLogger.add(serializer.serialize(topic, 3));
+        assertEquals(2, rawChangeLogger.numDirty());
+        assertEquals(2, rawChangeLogger.numRemoved());
+
+        written.put(0, "zero-again");
+        rawChangeLogger.add(serializer.serialize(topic, 0));
+        assertEquals(3, rawChangeLogger.numDirty());
+        assertEquals(1, rawChangeLogger.numRemoved());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index cb7a95c..31b8335 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.Serdes;
 
 import java.io.File;
 import java.util.Collections;
@@ -49,6 +50,16 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     long timestamp = -1L;
 
+    public MockProcessorContext(Serdes<?, ?> serdes, RecordCollector collector) {
+        this(null, null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(), serdes.valueDeserializer(), collector);
+    }
+
+    public MockProcessorContext(Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
+                                Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
+                                RecordCollector collector) {
+        this(null, null, keySerializer, keyDeserializer, valueSerializer, valueDeserializer, collector);
+    }
+
     public MockProcessorContext(KStreamTestDriver driver, File stateDir,
                                 Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
                                 Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index 6810841..73d446f 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -49,9 +49,9 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
     @Override
     public StateStore get() {
         if (loggingEnabled) {
-            return new MockStateStore(name, persistent);
+            return new MockStateStore(name, persistent).enableLogging();
         } else {
-            return new MockStateStore(name, persistent).disableLogging();
+            return new MockStateStore(name, persistent);
         }
     }
 
@@ -59,7 +59,7 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
         private final String name;
         private final boolean persistent;
 
-        public boolean loggingEnabled = true;
+        public boolean loggingEnabled = false;
         public boolean initialized = false;
         public boolean flushed = false;
         public boolean closed = false;
@@ -70,8 +70,8 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
             this.persistent = persistent;
         }
 
-        public MockStateStore disableLogging() {
-            loggingEnabled = false;
+        public MockStateStore enableLogging() {
+            loggingEnabled = true;
             return this;
         }
 


Mime
View raw message