kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: code cleanup (#6054)
Date Mon, 14 Jan 2019 21:36:50 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 82d1db6  MINOR: code cleanup (#6054)
82d1db6 is described below

commit 82d1db635826246570287cfb4c0b3572e3b011b4
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Mon Jan 14 13:36:36 2019 -0800

    MINOR: code cleanup (#6054)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>, Ismael Juma <ismael@confuent.io>
---
 checkstyle/suppressions.xml                        |   2 +
 .../internals/graph/KTableKTableJoinNode.java      |   4 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  50 +-
 .../streams/integration/EosIntegrationTest.java    |  56 +-
 .../CopartitionedTopicsValidatorTest.java          |  32 +-
 .../state/internals/CachingSessionStoreTest.java   |  23 +-
 .../state/internals/CachingWindowStoreTest.java    | 108 ++-
 .../CompositeReadOnlySessionStoreTest.java         |   4 +-
 .../CompositeReadOnlyWindowStoreTest.java          |   3 +-
 ...SortedCacheWrappedSessionStoreIteratorTest.java |  27 +-
 ...acheWrappedWindowStoreKeyValueIteratorTest.java |  33 +-
 .../state/internals/RocksDBSessionStoreTest.java   |  78 +-
 .../streams/state/internals/RocksDBStoreTest.java  |   5 +-
 .../state/internals/RocksDBWindowStoreTest.java    | 837 +++++++++++++++++----
 .../state/internals/SegmentIteratorTest.java       |  72 +-
 .../state/internals/StoreChangeLoggerTest.java     |  13 +-
 .../StreamThreadStateStoreProviderTest.java        |  40 +-
 .../streams/tests/StreamsStandByReplicaTest.java   |  64 +-
 .../apache/kafka/test/KTableValueGetterStub.java   |  50 --
 .../java/org/apache/kafka/test/MockAggregator.java |   7 +-
 .../org/apache/kafka/test/MockInitializer.java     |   1 -
 .../java/org/apache/kafka/test/MockProcessor.java  |  17 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  15 +-
 .../kafka/streams/TopologyTestDriverTest.java      |   7 +-
 24 files changed, 1052 insertions(+), 496 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7ac8c7d..1dcb133 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -165,6 +165,8 @@
               files="KStreamKStreamJoinTest.java"/>
     <suppress checks="MethodLength"
               files="KStreamWindowAggregateTest.java"/>
+    <suppress checks="MethodLength"
+              files="RocksDBWindowStoreTest.java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
               files=".*[/\\]streams[/\\].*test[/\\].*.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index 796468e..aeda0d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -86,8 +86,8 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
                                                        joinThisStoreNames);
 
         if (materializedInternal != null) {
-            final StoreBuilder<KeyValueStore<K, VR>> storeBuilder
-                = new KeyValueStoreMaterializer<>(materializedInternal).materialize();
+            final StoreBuilder<KeyValueStore<K, VR>> storeBuilder =
+                new KeyValueStoreMaterializer<>(materializedInternal).materialize();
             topologyBuilder.addStateStore(storeBuilder, mergeProcessorName);
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 89f3730..82000ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -16,10 +16,6 @@
  */
 package org.apache.kafka.streams;
 
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Duration;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.MockProducer;
@@ -61,6 +57,10 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -215,7 +215,7 @@ public class KafkaStreamsTest {
     @Test
     public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
         builder.globalTable("anyTopic");
-        final List<Node> nodes = asList(new Node(0, "localhost", 8121));
+        final List<Node> nodes = Collections.singletonList(new Node(0, "localhost", 8121));
         final Cluster cluster = new Cluster("mockClusterId", nodes,
                                             Collections.emptySet(), Collections.emptySet(),
                                             Collections.emptySet(), nodes.get(0));
@@ -326,14 +326,11 @@ public class KafkaStreamsTest {
 
         // make sure we have the global state thread running too
         builder.globalTable("anyTopic");
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        try {
+        try (final KafkaStreams streams = new KafkaStreams(builder.build(), props)) {
             streams.start();
             fail("expected start() to time out and throw an exception.");
         } catch (final StreamsException expected) {
             // This is a result of not being able to connect to the broker.
-        } finally {
-            streams.close();
         }
         // There's nothing to assert... We're testing that this operation actually completes.
     }
@@ -349,11 +346,8 @@ public class KafkaStreamsTest {
 
         // make sure we have the global state thread running too
         builder.table("anyTopic");
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        try {
+        try (final KafkaStreams streams = new KafkaStreams(builder.build(), props)) {
             streams.start();
-        } finally {
-            streams.close();
         }
         // There's nothing to assert... We're testing that this operation actually completes.
     }
@@ -362,9 +356,8 @@ public class KafkaStreamsTest {
     @Test
     public void testInitializesAndDestroysMetricsReporters() {
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
-        try {
+        try (final KafkaStreams streams = new KafkaStreams(builder.build(), props)) {
             final int newInitCount = MockMetricsReporter.INIT_COUNT.get();
             final int initDiff = newInitCount - oldInitCount;
             assertTrue("some reporters should be initialized by calling on construction", initDiff > 0);
@@ -373,8 +366,6 @@ public class KafkaStreamsTest {
             final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
             streams.close();
             assertEquals(oldCloseCount + initDiff, MockMetricsReporter.CLOSE_COUNT.get());
-        } finally {
-            streams.close();
         }
     }
 
@@ -584,8 +575,7 @@ public class KafkaStreamsTest {
 
         builder.table(topic, Materialized.as("store"));
 
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        try {
+        try (final KafkaStreams streams = new KafkaStreams(builder.build(), props)) {
             final CountDownLatch latch = new CountDownLatch(1);
             streams.setStateListener((newState, oldState) -> {
                 if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
@@ -601,21 +591,16 @@ public class KafkaStreamsTest {
             verifyCleanupStateDir(appDir, oldTaskDir);
             assertTrue(oldTaskDir.mkdirs());
             verifyCleanupStateDir(appDir, oldTaskDir);
-        } finally {
-            streams.close();
         }
     }
 
     @Test
     public void shouldThrowOnNegativeTimeoutForClose() {
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
-        try {
+        try (final KafkaStreams streams = new KafkaStreams(builder.build(), props)) {
             streams.close(Duration.ofMillis(-1L));
             fail("should not accept negative close parameter");
         } catch (final IllegalArgumentException e) {
             // expected
-        } finally {
-            streams.close();
         }
     }
 
@@ -684,9 +669,12 @@ public class KafkaStreamsTest {
                                          final String globalStoreName,
                                          final boolean isPersistentStore) throws Exception {
         CLUSTER.createTopics(inputTopic, outputTopic, globalTopicName);
-        final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(isPersistentStore ?
-                Stores.persistentKeyValueStore(storeName) : Stores.inMemoryKeyValueStore(storeName), 
-                Serdes.String(), Serdes.Long());
+        final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
+            isPersistentStore ?
+                Stores.persistentKeyValueStore(storeName)
+                : Stores.inMemoryKeyValueStore(storeName),
+            Serdes.String(),
+            Serdes.Long());
         final Topology topology = new Topology();
         topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic)
                 .addProcessor("process", () -> new AbstractProcessor<String, String>() {
@@ -769,7 +757,8 @@ public class KafkaStreamsTest {
         }
     }
 
-    private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException {
+    private void verifyCleanupStateDir(final String appDir,
+                                       final File oldTaskDir) throws InterruptedException {
         final File taskDir = new File(appDir, "0_0");
         TestUtils.waitForCondition(
             () -> !oldTaskDir.exists() && taskDir.exists(),
@@ -785,7 +774,8 @@ public class KafkaStreamsTest {
         public Map<KafkaStreams.State, Long> mapStates = new HashMap<>();
 
         @Override
-        public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
+        public void onChange(final KafkaStreams.State newState,
+                             final KafkaStreams.State oldState) {
             final long prevCount = mapStates.containsKey(newState) ? mapStates.get(newState) : 0;
             numChanges++;
             this.oldState = oldState;
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 505d454..f43b396 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -73,11 +73,10 @@ public class EosIntegrationTest {
     private static final int MAX_WAIT_TIME_MS = 60 * 1000;
 
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, new Properties() {
-        {
-            put("auto.create.topics.enable", false);
-        }
-    });
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
+        NUM_BROKERS,
+        Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false"))
+    );
 
     private static String applicationId;
     private final static int NUM_TOPIC_PARTITIONS = 2;
@@ -154,23 +153,22 @@ public class EosIntegrationTest {
         }
         output.to(outputTopic);
 
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
+        properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+
         for (int i = 0; i < numberOfRestarts; ++i) {
             final Properties config = StreamsTestUtils.getStreamsConfig(
                 applicationId,
                 CLUSTER.bootstrapServers(),
                 Serdes.LongSerde.class.getName(),
                 Serdes.LongSerde.class.getName(),
-                new Properties() {
-                    {
-                        put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
-                        put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-                        put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-                        put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
-                        put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
-                        put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
-                        put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-                    }
-                });
+                properties);
 
             try (final KafkaStreams streams = new KafkaStreams(builder.build(), config)) {
                 streams.start();
@@ -275,11 +273,10 @@ public class EosIntegrationTest {
                         CONSUMER_GROUP_ID,
                         LongDeserializer.class,
                         LongDeserializer.class,
-                        new Properties() {
-                            {
-                                put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
-                            }
-                        }),
+                        Utils.mkProperties(Collections.singletonMap(
+                            ConsumerConfig.ISOLATION_LEVEL_CONFIG,
+                            IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))
+                        ),
                     SINGLE_PARTITION_OUTPUT_TOPIC,
                     firstBurstOfData.size()
                 );
@@ -300,11 +297,10 @@ public class EosIntegrationTest {
                         CONSUMER_GROUP_ID,
                         LongDeserializer.class,
                         LongDeserializer.class,
-                        new Properties() {
-                            {
-                                put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
-                            }
-                        }),
+                        Utils.mkProperties(Collections.singletonMap(
+                            ConsumerConfig.ISOLATION_LEVEL_CONFIG,
+                            IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))
+                        ),
                     SINGLE_PARTITION_OUTPUT_TOPIC,
                     secondBurstOfData.size()
                 );
@@ -691,11 +687,9 @@ public class EosIntegrationTest {
                     groupId,
                     LongDeserializer.class,
                     LongDeserializer.class,
-                    new Properties() {
-                        {
-                            put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
-                        }
-                    }),
+                    Utils.mkProperties(Collections.singletonMap(
+                        ConsumerConfig.ISOLATION_LEVEL_CONFIG,
+                        IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))),
                 SINGLE_PARTITION_OUTPUT_TOPIC,
                 numberOfRecords
             );
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
index a71f488..d6119fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
@@ -33,23 +33,31 @@ import static org.hamcrest.MatcherAssert.assertThat;
 
 public class CopartitionedTopicsValidatorTest {
 
-    private final StreamsPartitionAssignor.CopartitionedTopicsValidator validator
-            = new StreamsPartitionAssignor.CopartitionedTopicsValidator("thread");
+    private final StreamsPartitionAssignor.CopartitionedTopicsValidator validator =
+        new StreamsPartitionAssignor.CopartitionedTopicsValidator("thread");
     private final Map<TopicPartition, PartitionInfo> partitions = new HashMap<>();
     private final Cluster cluster = Cluster.empty();
 
     @Before
     public void before() {
-        partitions.put(new TopicPartition("first", 0), new PartitionInfo("first", 0, null, null, null));
-        partitions.put(new TopicPartition("first", 1), new PartitionInfo("first", 1, null, null, null));
-        partitions.put(new TopicPartition("second", 0), new PartitionInfo("second", 0, null, null, null));
-        partitions.put(new TopicPartition("second", 1), new PartitionInfo("second", 1, null, null, null));
+        partitions.put(
+            new TopicPartition("first", 0),
+            new PartitionInfo("first", 0, null, null, null));
+        partitions.put(
+            new TopicPartition("first", 1),
+            new PartitionInfo("first", 1, null, null, null));
+        partitions.put(
+            new TopicPartition("second", 0),
+            new PartitionInfo("second", 0, null, null, null));
+        partitions.put(
+            new TopicPartition("second", 1),
+            new PartitionInfo("second", 1, null, null, null));
     }
 
     @Test(expected = IllegalStateException.class)
     public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() {
         validator.validate(Collections.singleton("topic"),
-                           Collections.<String, StreamsPartitionAssignor.InternalTopicMetadata>emptyMap(),
+                           Collections.emptyMap(),
                            cluster);
     }
 
@@ -57,7 +65,7 @@ public class CopartitionedTopicsValidatorTest {
     public void shouldThrowTopologyBuilderExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() {
         partitions.remove(new TopicPartition("second", 0));
         validator.validate(Utils.mkSet("first", "second"),
-                           Collections.<String, StreamsPartitionAssignor.InternalTopicMetadata>emptyMap(),
+                           Collections.emptyMap(),
                            cluster.withPartitions(partitions));
     }
 
@@ -100,11 +108,11 @@ public class CopartitionedTopicsValidatorTest {
 
     private StreamsPartitionAssignor.InternalTopicMetadata createTopicMetadata(final String repartitionTopic,
                                                                                final int partitions) {
-        final InternalTopicConfig repartitionTopicConfig
-                = new RepartitionTopicConfig(repartitionTopic, Collections.<String, String>emptyMap());
+        final InternalTopicConfig repartitionTopicConfig =
+            new RepartitionTopicConfig(repartitionTopic, Collections.emptyMap());
 
-        final StreamsPartitionAssignor.InternalTopicMetadata metadata
-                = new StreamsPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig);
+        final StreamsPartitionAssignor.InternalTopicMetadata metadata =
+            new StreamsPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig);
         metadata.numPartitions = partitions;
         return metadata;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index f06e129..fabda42 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -35,6 +35,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -44,13 +45,13 @@ import java.util.Set;
 
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.state.internals.RocksDBSessionStoreTest.toList;
+import static org.apache.kafka.streams.state.internals.WrappedSessionStoreIterator.bytesIterator;
 import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
 import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
-
 @SuppressWarnings("PointlessArithmeticExpression")
 public class CachingSessionStoreTest {
 
@@ -144,7 +145,6 @@ public class CachingSessionStoreTest {
         // add one that shouldn't appear in the results
         cachingStore.put(new Windowed<>(keyAA, new SessionWindow(0, 0)), "5".getBytes());
 
-
         final List<KeyValue<Windowed<Bytes>, byte[]>> results = toList(cachingStore.fetch(keyA));
         verifyKeyValueList(expected, results);
     }
@@ -154,7 +154,8 @@ public class CachingSessionStoreTest {
         final StateSerdes<Bytes, byte[]> serdes = new StateSerdes<>("topic", Serdes.Bytes(), Serdes.ByteArray());
         final List<KeyValue<Windowed<Bytes>, byte[]>> added = addSessionsUntilOverflow("a", "b", "c", "d");
         assertEquals(added.size() - 1, cache.size());
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = WrappedSessionStoreIterator.bytesIterator(underlying.fetch(added.get(0).key.key(), 0, 0), serdes);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+            bytesIterator(underlying.fetch(added.get(0).key.key(), 0, 0), serdes);
         final KeyValue<Windowed<Bytes>, byte[]> next = iterator.next();
         assertEquals(added.get(0).key, next.key);
         assertArrayEquals(added.get(0).value, next.value);
@@ -163,7 +164,10 @@ public class CachingSessionStoreTest {
     @Test
     public void shouldQueryItemsInCacheAndStore() {
         final List<KeyValue<Windowed<Bytes>, byte[]>> added = addSessionsUntilOverflow("a");
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.findSessions(Bytes.wrap("a".getBytes()), 0, added.size() * 10);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.findSessions(
+            Bytes.wrap("a".getBytes(StandardCharsets.UTF_8)),
+            0,
+            added.size() * 10);
         final List<KeyValue<Windowed<Bytes>, byte[]>> actual = toList(iterator);
         verifyKeyValueList(added, actual);
     }
@@ -177,7 +181,8 @@ public class CachingSessionStoreTest {
         cachingStore.flush();
         cachingStore.remove(a);
         cachingStore.flush();
-        final KeyValueIterator<Windowed<Bytes>, byte[]> rangeIter = cachingStore.findSessions(keyA, 0, 0);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> rangeIter =
+            cachingStore.findSessions(keyA, 0, 0);
         assertFalse(rangeIter.hasNext());
     }
 
@@ -190,7 +195,8 @@ public class CachingSessionStoreTest {
         cachingStore.put(a2, "2".getBytes());
         cachingStore.put(a3, "3".getBytes());
         cachingStore.flush();
-        final KeyValueIterator<Windowed<Bytes>, byte[]> results = cachingStore.findSessions(keyA, 0, SEGMENT_INTERVAL * 2);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> results =
+            cachingStore.findSessions(keyA, 0, SEGMENT_INTERVAL * 2);
         assertEquals(a1, results.next().key);
         assertEquals(a2, results.next().key);
         assertEquals(a3, results.next().key);
@@ -211,7 +217,8 @@ public class CachingSessionStoreTest {
         cachingStore.put(aa3, "3".getBytes());
         cachingStore.flush();
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults =
+            cachingStore.findSessions(keyA, keyAA, 0, SEGMENT_INTERVAL * 2);
         final Set<Windowed<Bytes>> keys = new HashSet<>();
         while (rangeResults.hasNext()) {
             keys.add(rangeResults.next().key);
@@ -294,7 +301,6 @@ public class CachingSessionStoreTest {
         cachingStore.put(a, "2".getBytes());
         cachingStore.flush();
 
-
         assertEquals(
             flushed,
             Arrays.asList(
@@ -395,5 +401,4 @@ public class CachingSessionStoreTest {
         allSessions.add(KeyValue.pair(key, value));
     }
 
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index 44fa62f..610fc60 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -45,6 +45,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
@@ -64,7 +65,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 
-
 public class CachingWindowStoreTest {
 
     private static final int MAX_CACHE_SIZE_BYTES = 150;
@@ -83,7 +83,12 @@ public class CachingWindowStoreTest {
     public void setUp() {
         keySchema = new WindowKeySchema();
         underlying = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0, SEGMENT_INTERVAL, keySchema);
-        final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false, WINDOW_SIZE);
+        final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(
+            underlying,
+            Serdes.Bytes(),
+            Serdes.ByteArray(),
+            false,
+            WINDOW_SIZE);
         cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>();
         cachingStore = new CachingWindowStore<>(windowStore, Serdes.String(), Serdes.String(), WINDOW_SIZE, SEGMENT_INTERVAL);
         cachingStore.setFlushListener(cacheListener, false);
@@ -151,9 +156,7 @@ public class CachingWindowStoreTest {
                 }
 
                 @Override
-                public void close() {
-
-                }
+                public void close() {}
             }, "store-name");
 
         final String bootstrapServers = "localhost:9092";
@@ -169,7 +172,10 @@ public class CachingWindowStoreTest {
         final long initialWallClockTime = 0L;
         final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), streamsConfiguration, initialWallClockTime);
 
-        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(Serdes.String().serializer(), Serdes.String().serializer(), initialWallClockTime);
+        final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(
+            Serdes.String().serializer(),
+            Serdes.String().serializer(),
+            initialWallClockTime);
 
         for (int i = 0; i < 5; i++) {
             driver.pipeInput(recordFactory.create(topic, UUID.randomUUID().toString(), UUID.randomUUID().toString()));
@@ -210,7 +216,9 @@ public class CachingWindowStoreTest {
         assertEquals(2, cache.size());
     }
 
-    private void verifyKeyValue(final KeyValue<Long, byte[]> next, final long expectedKey, final String expectedValue) {
+    private void verifyKeyValue(final KeyValue<Long, byte[]> next,
+                                final long expectedKey,
+                                final String expectedValue) {
         assertThat(next.key, equalTo(expectedKey));
         assertThat(next.value, equalTo(bytesValue(expectedValue)));
     }
@@ -228,9 +236,16 @@ public class CachingWindowStoreTest {
         cachingStore.put(bytesKey("a"), bytesValue("a"));
         cachingStore.put(bytesKey("b"), bytesValue("b"));
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10));
-        verifyWindowedKeyValue(iterator.next(), new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), "a");
-        verifyWindowedKeyValue(iterator.next(), new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), "b");
+        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+            cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10));
+        verifyWindowedKeyValue(
+            iterator.next(),
+            new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+            "a");
+        verifyWindowedKeyValue(
+            iterator.next(),
+            new Windowed<>(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+            "b");
         assertFalse(iterator.hasNext());
         assertEquals(2, cache.size());
     }
@@ -249,7 +264,10 @@ public class CachingWindowStoreTest {
         final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.all();
         final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
         for (final String s : array) {
-            verifyWindowedKeyValue(iterator.next(), new Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), s);
+            verifyWindowedKeyValue(
+                iterator.next(),
+                new Windowed<>(bytesKey(s), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                s);
         }
         assertFalse(iterator.hasNext());
     }
@@ -262,24 +280,36 @@ public class CachingWindowStoreTest {
             cachingStore.put(bytesKey(array[i]), bytesValue(array[i]));
         }
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.fetchAll(ofEpochMilli(0), ofEpochMilli(7));
+        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+            cachingStore.fetchAll(ofEpochMilli(0), ofEpochMilli(7));
         for (int i = 0; i < array.length; i++) {
             final String str = array[i];
-            verifyWindowedKeyValue(iterator.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
+            verifyWindowedKeyValue(
+                iterator.next(),
+                new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)),
+                str);
         }
         assertFalse(iterator.hasNext());
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 = cachingStore.fetchAll(ofEpochMilli(2), ofEpochMilli(4));
+        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 =
+            cachingStore.fetchAll(ofEpochMilli(2), ofEpochMilli(4));
         for (int i = 2; i <= 4; i++) {
             final String str = array[i];
-            verifyWindowedKeyValue(iterator1.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
+            verifyWindowedKeyValue(
+                iterator1.next(),
+                new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)),
+                str);
         }
         assertFalse(iterator1.hasNext());
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 = cachingStore.fetchAll(ofEpochMilli(5), ofEpochMilli(7));
+        final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 =
+            cachingStore.fetchAll(ofEpochMilli(5), ofEpochMilli(7));
         for (int i = 5; i <= 7; i++) {
             final String str = array[i];
-            verifyWindowedKeyValue(iterator2.next(), new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)), str);
+            verifyWindowedKeyValue(
+                iterator2.next(),
+                new Windowed<>(bytesKey(str), new TimeWindow(i, i + WINDOW_SIZE)),
+                str);
         }
         assertFalse(iterator2.hasNext());
     }
@@ -288,7 +318,10 @@ public class CachingWindowStoreTest {
     public void shouldFlushEvictedItemsIntoUnderlyingStore() {
         final int added = addItemsToCache();
         // all dirty entries should have been flushed
-        final KeyValueIterator<Bytes, byte[]> iter = underlying.fetch(Bytes.wrap("0".getBytes()), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP);
+        final KeyValueIterator<Bytes, byte[]> iter = underlying.fetch(
+            Bytes.wrap("0".getBytes(StandardCharsets.UTF_8)),
+            DEFAULT_TIMESTAMP,
+            DEFAULT_TIMESTAMP);
         final KeyValue<Bytes, byte[]> next = iter.next();
         assertEquals(DEFAULT_TIMESTAMP, keySchema.segmentTimestamp(next.key));
         assertArrayEquals("0".getBytes(), next.value);
@@ -298,7 +331,8 @@ public class CachingWindowStoreTest {
 
     @Test
     public void shouldForwardDirtyItemsWhenFlushCalled() {
-        final Windowed<String> windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
+        final Windowed<String> windowedKey =
+            new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
         cachingStore.put(bytesKey("1"), bytesValue("a"));
         cachingStore.flush();
         assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue);
@@ -308,7 +342,8 @@ public class CachingWindowStoreTest {
     @Test
     public void shouldForwardOldValuesWhenEnabled() {
         cachingStore.setFlushListener(cacheListener, true);
-        final Windowed<String> windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
+        final Windowed<String> windowedKey =
+            new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
         cachingStore.put(bytesKey("1"), bytesValue("a"));
         cachingStore.flush();
         cachingStore.put(bytesKey("1"), bytesValue("b"));
@@ -319,7 +354,8 @@ public class CachingWindowStoreTest {
 
     @Test
     public void shouldForwardOldValuesWhenDisabled() {
-        final Windowed<String> windowedKey = new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
+        final Windowed<String> windowedKey =
+            new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
         cachingStore.put(bytesKey("1"), bytesValue("a"));
         cachingStore.flush();
         cachingStore.put(bytesKey("1"), bytesValue("b"));
@@ -340,7 +376,8 @@ public class CachingWindowStoreTest {
         cachingStore.flush();
         cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
 
-        final WindowStoreIterator<byte[]> fetch = cachingStore.fetch(bytesKey("1"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP));
+        final WindowStoreIterator<byte[]> fetch =
+            cachingStore.fetch(bytesKey("1"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP));
         verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "b");
         assertFalse(fetch.hasNext());
     }
@@ -350,7 +387,8 @@ public class CachingWindowStoreTest {
         cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
         cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE);
 
-        final WindowStoreIterator<byte[]> fetch = cachingStore.fetch(bytesKey("1"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
+        final WindowStoreIterator<byte[]> fetch =
+            cachingStore.fetch(bytesKey("1"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
         verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
         verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
         assertFalse(fetch.hasNext());
@@ -361,7 +399,8 @@ public class CachingWindowStoreTest {
         final Bytes key = Bytes.wrap("1".getBytes());
         underlying.put(WindowKeySchema.toStoreKeyBinary(key, DEFAULT_TIMESTAMP, 0), "a".getBytes());
         cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE);
-        final WindowStoreIterator<byte[]> fetch = cachingStore.fetch(bytesKey("1"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
+        final WindowStoreIterator<byte[]> fetch =
+            cachingStore.fetch(bytesKey("1"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
         verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
         verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
         assertFalse(fetch.hasNext());
@@ -375,8 +414,14 @@ public class CachingWindowStoreTest {
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> fetchRange =
             cachingStore.fetch(key, bytesKey("2"), ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE));
-        verifyWindowedKeyValue(fetchRange.next(), new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), "a");
-        verifyWindowedKeyValue(fetchRange.next(), new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP + WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)), "b");
+        verifyWindowedKeyValue(
+            fetchRange.next(),
+            new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+            "a");
+        verifyWindowedKeyValue(
+            fetchRange.next(),
+            new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP + WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)),
+            "b");
         assertFalse(fetchRange.hasNext());
     }
 
@@ -419,7 +464,8 @@ public class CachingWindowStoreTest {
             KeyValue.pair(1L, bytesValue("0003")),
             KeyValue.pair(SEGMENT_INTERVAL, bytesValue("0005"))
         );
-        final List<KeyValue<Long, byte[]>> actual = toList(cachingStore.fetch(bytesKey("a"), ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE)));
+        final List<KeyValue<Long, byte[]>> actual =
+            toList(cachingStore.fetch(bytesKey("a"), ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE)));
         verifyKeyValueList(expected, actual);
     }
 
@@ -441,7 +487,9 @@ public class CachingWindowStoreTest {
         );
 
         verifyKeyValueList(
-            asList(windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1)),
+            asList(
+                windowedPair("aa", "0002", 0),
+                windowedPair("aa", "0004", 1)),
             toList(cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE)))
         );
 
@@ -483,7 +531,9 @@ public class CachingWindowStoreTest {
     }
 
     private static KeyValue<Windowed<Bytes>, byte[]> windowedPair(final String key, final String value, final long timestamp) {
-        return KeyValue.pair(new Windowed<>(bytesKey(key), new TimeWindow(timestamp, timestamp + WINDOW_SIZE)), bytesValue(value));
+        return KeyValue.pair(
+            new Windowed<>(bytesKey(key), new TimeWindow(timestamp, timestamp + WINDOW_SIZE)),
+            bytesValue(value));
     }
 
     private int addItemsToCache() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
index 3ecf6ef..23534ef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java
@@ -110,7 +110,7 @@ public class CompositeReadOnlySessionStoreTest {
         final CompositeReadOnlySessionStore<String, String> store =
             new CompositeReadOnlySessionStore<>(
                 new StateStoreProviderStub(true),
-                QueryableStoreTypes.<String, String>sessionStore(),
+                QueryableStoreTypes.sessionStore(),
                 "whateva");
 
         store.fetch("a");
@@ -155,4 +155,4 @@ public class CompositeReadOnlySessionStoreTest {
     public void shouldThrowNPEIfToKeyIsNull() {
         underlyingSessionStore.fetch("a", null);
     }
-}
\ No newline at end of file
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
index 6df41fc..915d2a3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java
@@ -47,8 +47,7 @@ public class CompositeReadOnlyWindowStoreTest {
     private final String storeName = "window-store";
     private StateStoreProviderStub stubProviderOne;
     private StateStoreProviderStub stubProviderTwo;
-    private CompositeReadOnlyWindowStore<String, String>
-        windowStore;
+    private CompositeReadOnlyWindowStore<String, String> windowStore;
     private ReadOnlyWindowStoreStub<String, String> underlyingWindowStore;
     private ReadOnlyWindowStoreStub<String, String> otherUnderlyingStore;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
index 26438bf..2008d93 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedSessionStoreIteratorTest.java
@@ -57,44 +57,37 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest {
 
     @Test
     public void shouldHaveNextFromStore() {
-        final MergedSortedCacheSessionStoreIterator mergeIterator
-                = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+        final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator());
         assertTrue(mergeIterator.hasNext());
     }
 
     @Test
     public void shouldGetNextFromStore() {
-        final MergedSortedCacheSessionStoreIterator mergeIterator
-                = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+        final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator());
         assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey.get())));
     }
 
     @Test
     public void shouldPeekNextKeyFromStore() {
-        final MergedSortedCacheSessionStoreIterator mergeIterator
-                = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+        final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(storeKvs, Collections.emptyIterator());
         assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(storeKey, storeWindow)));
     }
 
     @Test
     public void shouldHaveNextFromCache() {
-        final MergedSortedCacheSessionStoreIterator mergeIterator
-                = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(),
-                                 cacheKvs);
+        final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs);
         assertTrue(mergeIterator.hasNext());
     }
 
     @Test
     public void shouldGetNextFromCache() {
-        final MergedSortedCacheSessionStoreIterator mergeIterator
-                = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(), cacheKvs);
+        final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs);
         assertThat(mergeIterator.next(), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey.get())));
     }
 
     @Test
     public void shouldPeekNextKeyFromCache() {
-        final MergedSortedCacheSessionStoreIterator mergeIterator
-                = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(), cacheKvs);
+        final MergedSortedCacheSessionStoreIterator mergeIterator = createIterator(Collections.emptyIterator(), cacheKvs);
         assertThat(mergeIterator.peekNextKey(), equalTo(new Windowed<>(cacheKey, cacheWindow)));
     }
 
@@ -108,11 +101,11 @@ public class MergedSortedCacheWrappedSessionStoreIteratorTest {
 
     private MergedSortedCacheSessionStoreIterator createIterator(final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs,
                                                                  final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs) {
-        final DelegatingPeekingKeyValueIterator<Windowed<Bytes>, byte[]> storeIterator
-                = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(storeKvs));
+        final DelegatingPeekingKeyValueIterator<Windowed<Bytes>, byte[]> storeIterator =
+            new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(storeKvs));
 
-        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator
-                = new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs));
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator =
+            new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs));
         return new MergedSortedCacheSessionStoreIterator(cacheIterator, storeIterator, SINGLE_SEGMENT_CACHE_FUNCTION);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
index 1e926e4..583e635 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
@@ -62,44 +62,43 @@ public class MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest {
 
     @Test
     public void shouldHaveNextFromStore() {
-        final MergedSortedCacheWindowStoreKeyValueIterator mergeIterator
-            = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+        final MergedSortedCacheWindowStoreKeyValueIterator mergeIterator =
+            createIterator(storeKvs, Collections.emptyIterator());
         assertTrue(mergeIterator.hasNext());
     }
 
     @Test
     public void shouldGetNextFromStore() {
-        final MergedSortedCacheWindowStoreKeyValueIterator mergeIterator
-            = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+        final MergedSortedCacheWindowStoreKeyValueIterator mergeIterator =
+            createIterator(storeKvs, Collections.emptyIterator());
         assertThat(convertKeyValuePair(mergeIterator.next()), equalTo(KeyValue.pair(new Windowed<>(storeKey, storeWindow), storeKey)));
     }
 
     @Test
     public void shouldPeekNextKeyFromStore() {
-        final MergedSortedCacheWindowStoreKeyValueIterator mergeIterator
-            = createIterator(storeKvs, Collections.<KeyValue<Bytes, LRUCacheEntry>>emptyIterator());
+        final MergedSortedCacheWindowStoreKeyValueIterator mergeIterator =
+            createIterator(storeKvs, Collections.emptyIterator());
         assertThat(convertWindowedKey(mergeIterator.peekNextKey()), equalTo(new Windowed<>(storeKey, storeWindow)));
     }
 
     @Test
     public void shouldHaveNextFromCache() {
-        final MergedSortedCacheWindowStoreKeyValueIterator mergeIterator
-            = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(),
-                             cacheKvs);
+        final MergedSortedCacheWindowStoreKeyValueIterator mergeIterator =
+            createIterator(Collections.emptyIterator(), cacheKvs);
         assertTrue(mergeIterator.hasNext());
     }
 
     @Test
     public void shouldGetNextFromCache() {
-        final MergedSortedCacheWindowStoreKeyValueIterator mergeIterator
-            = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(), cacheKvs);
+        final MergedSortedCacheWindowStoreKeyValueIterator mergeIterator =
+            createIterator(Collections.emptyIterator(), cacheKvs);
         assertThat(convertKeyValuePair(mergeIterator.next()), equalTo(KeyValue.pair(new Windowed<>(cacheKey, cacheWindow), cacheKey)));
     }
 
     @Test
     public void shouldPeekNextKeyFromCache() {
-        final MergedSortedCacheWindowStoreKeyValueIterator mergeIterator
-            = createIterator(Collections.<KeyValue<Windowed<Bytes>, byte[]>>emptyIterator(), cacheKvs);
+        final MergedSortedCacheWindowStoreKeyValueIterator mergeIterator =
+            createIterator(Collections.emptyIterator(), cacheKvs);
         assertThat(convertWindowedKey(mergeIterator.peekNextKey()), equalTo(new Windowed<>(cacheKey, cacheWindow)));
     }
 
@@ -126,11 +125,11 @@ public class MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest {
         final Iterator<KeyValue<Windowed<Bytes>, byte[]>> storeKvs,
         final Iterator<KeyValue<Bytes, LRUCacheEntry>> cacheKvs
     ) {
-        final DelegatingPeekingKeyValueIterator<Windowed<Bytes>, byte[]> storeIterator
-            = new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(storeKvs));
+        final DelegatingPeekingKeyValueIterator<Windowed<Bytes>, byte[]> storeIterator =
+            new DelegatingPeekingKeyValueIterator<>("store", new KeyValueIteratorStub<>(storeKvs));
 
-        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator
-            = new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs));
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator =
+            new DelegatingPeekingKeyValueIterator<>("cache", new KeyValueIteratorStub<>(cacheKvs));
         return new MergedSortedCacheWindowStoreKeyValueIterator(
             cacheIterator,
             storeIterator,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index 2a35342..3ebeee5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -52,18 +52,27 @@ public class RocksDBSessionStoreTest {
         final SessionKeySchema schema = new SessionKeySchema();
         schema.init("topic");
 
-        final RocksDBSegmentedBytesStore bytesStore =
-                new RocksDBSegmentedBytesStore("session-store", "metrics-scope", 10_000L, 60_000L, schema);
-
-        sessionStore = new RocksDBSessionStore<>(bytesStore,
-                                                 Serdes.String(),
-                                                 Serdes.Long());
-
-        context = new InternalMockProcessorContext(TestUtils.tempDirectory(),
-                                           Serdes.String(),
-                                           Serdes.Long(),
-                                           new NoOpRecordCollector(),
-                                           new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
+        final RocksDBSegmentedBytesStore bytesStore = new RocksDBSegmentedBytesStore(
+            "session-store",
+            "metrics-scope",
+            10_000L,
+            60_000L,
+            schema);
+
+        sessionStore = new RocksDBSessionStore<>(
+            bytesStore,
+            Serdes.String(),
+            Serdes.Long());
+
+        context = new InternalMockProcessorContext(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.Long(),
+            new NoOpRecordCollector(),
+            new ThreadCache(
+                new LogContext("testCache "),
+                0,
+                new MockStreamsMetrics(new Metrics())));
         sessionStore.init(context, sessionStore);
     }
 
@@ -85,17 +94,19 @@ public class RocksDBSessionStoreTest {
         final List<KeyValue<Windowed<String>, Long>> expected =
             Arrays.asList(KeyValue.pair(a1, 1L), KeyValue.pair(a2, 2L));
 
-        final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(key, 0, 1000L);
+        final KeyValueIterator<Windowed<String>, Long> values =
+            sessionStore.findSessions(key, 0, 1000L);
         assertEquals(expected, toList(values));
     }
 
     @Test
     public void shouldFetchAllSessionsWithSameRecordKey() {
+        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
+            KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L),
+            KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L),
+            KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L),
+            KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L));
 
-        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(KeyValue.pair(new Windowed<>("a", new SessionWindow(0, 0)), 1L),
-                                                                                    KeyValue.pair(new Windowed<>("a", new SessionWindow(10, 10)), 2L),
-                                                                                    KeyValue.pair(new Windowed<>("a", new SessionWindow(100, 100)), 3L),
-                                                                                    KeyValue.pair(new Windowed<>("a", new SessionWindow(1000, 1000)), 4L));
         for (final KeyValue<Windowed<String>, Long> kv : expected) {
             sessionStore.put(kv.key, kv.value);
         }
@@ -105,20 +116,20 @@ public class RocksDBSessionStoreTest {
 
         final List<KeyValue<Windowed<String>, Long>> results = toList(sessionStore.fetch("a"));
         assertEquals(expected, results);
-
     }
 
-
     @Test
     public void shouldFindValuesWithinMergingSessionWindowRange() {
         final String key = "a";
         sessionStore.put(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L);
         sessionStore.put(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L);
-        final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessions(key, -1, 1000L);
+        final KeyValueIterator<Windowed<String>, Long> results =
+            sessionStore.findSessions(key, -1, 1000L);
 
         final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
                 KeyValue.pair(new Windowed<>(key, new SessionWindow(0L, 0L)), 1L),
                 KeyValue.pair(new Windowed<>(key, new SessionWindow(1000L, 1000L)), 2L));
+
         assertEquals(expected, toList(results));
     }
 
@@ -145,7 +156,8 @@ public class RocksDBSessionStoreTest {
         sessionStore.put(session3, 3L);
         sessionStore.put(session4, 4L);
         sessionStore.put(session5, 5L);
-        final KeyValueIterator<Windowed<String>, Long> results = sessionStore.findSessions("a", 150, 300);
+        final KeyValueIterator<Windowed<String>, Long> results =
+            sessionStore.findSessions("a", 150, 300);
         assertEquals(session2, results.next().key);
         assertEquals(session3, results.next().key);
         assertFalse(results.hasNext());
@@ -153,12 +165,17 @@ public class RocksDBSessionStoreTest {
 
     @Test
     public void shouldFetchExactKeys() {
-        final RocksDBSegmentedBytesStore bytesStore =
-                new RocksDBSegmentedBytesStore("session-store", "metrics-scope", 0x7a00000000000000L, 0x7a00000000000000L, new SessionKeySchema());
-
-        sessionStore = new RocksDBSessionStore<>(bytesStore,
-                                                 Serdes.String(),
-                                                 Serdes.Long());
+        final RocksDBSegmentedBytesStore bytesStore = new RocksDBSegmentedBytesStore(
+            "session-store",
+            "metrics-scope",
+            0x7a00000000000000L,
+            0x7a00000000000000L,
+            new SessionKeySchema());
+
+        sessionStore = new RocksDBSessionStore<>(
+            bytesStore,
+            Serdes.String(),
+            Serdes.Long());
 
         sessionStore.init(context, sessionStore);
 
@@ -168,7 +185,8 @@ public class RocksDBSessionStoreTest {
         sessionStore.put(new Windowed<>("aa", new SessionWindow(10, 20)), 4L);
         sessionStore.put(new Windowed<>("a", new SessionWindow(0x7a00000000000000L - 2, 0x7a00000000000000L - 1)), 5L);
 
-        KeyValueIterator<Windowed<String>, Long> iterator = sessionStore.findSessions("a", 0, Long.MAX_VALUE);
+        KeyValueIterator<Windowed<String>, Long> iterator =
+            sessionStore.findSessions("a", 0, Long.MAX_VALUE);
         List<Long> results = new ArrayList<>();
         while (iterator.hasNext()) {
             results.add(iterator.next().value);
@@ -186,7 +204,8 @@ public class RocksDBSessionStoreTest {
         assertThat(results, equalTo(Arrays.asList(2L, 4L)));
 
 
-        final KeyValueIterator<Windowed<String>, Long> rangeIterator = sessionStore.findSessions("a", "aa", 0, Long.MAX_VALUE);
+        final KeyValueIterator<Windowed<String>, Long> rangeIterator =
+            sessionStore.findSessions("a", "aa", 0, Long.MAX_VALUE);
         final List<Long> rangeResults = new ArrayList<>();
         while (rangeIterator.hasNext()) {
             rangeResults.add(rangeIterator.next().value);
@@ -242,5 +261,4 @@ public class RocksDBSessionStoreTest {
         return results;
     }
 
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index b77b02d..67c2d5d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -87,11 +87,14 @@ public class RocksDBStoreTest {
         restoreListener.onRestoreStart(null, rocksDBStore.name(), 0L, 0L);
 
         assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(1 << 30));
+        assertThat(rocksDBStore.getOptions().level0SlowdownWritesTrigger(), equalTo(1 << 30));
+        assertThat(rocksDBStore.getOptions().level0StopWritesTrigger(), equalTo(1 << 30));
 
         restoreListener.onRestoreEnd(null, rocksDBStore.name(), 0L);
 
         assertThat(rocksDBStore.getOptions().level0FileNumCompactionTrigger(), equalTo(10));
-    }
+        assertThat(rocksDBStore.getOptions().level0SlowdownWritesTrigger(), equalTo(20));
+        assertThat(rocksDBStore.getOptions().level0StopWritesTrigger(), equalTo(36));    }
 
     @Test
     public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 09c5e5b..c155a83 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -45,8 +45,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -78,9 +78,13 @@ public class RocksDBWindowStoreTest {
     private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", Serdes.Integer(), Serdes.String());
 
     private final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
-    private final ThreadCache cache = new ThreadCache(new LogContext("TestCache "), DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
+    private final ThreadCache cache = new ThreadCache(
+        new LogContext("TestCache "),
+        DEFAULT_CACHE_SIZE_BYTES,
+        new MockStreamsMetrics(new Metrics()));
 
-    private final Producer<byte[], byte[]> producer = new MockProducer<>(true, Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer());
+    private final Producer<byte[], byte[]> producer =
+        new MockProducer<>(true, Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer());
     private final RecordCollector recordCollector = new RecordCollectorImpl(
         "RocksDBWindowStoreTestTask",
         new LogContext("RocksDBWindowStoreTestTask "),
@@ -104,12 +108,17 @@ public class RocksDBWindowStoreTest {
     };
 
     private final File baseDir = TestUtils.tempDirectory("test");
-    private final InternalMockProcessorContext context = new InternalMockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache);
+    private final InternalMockProcessorContext context =
+        new InternalMockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache);
     private WindowStore<Integer, String> windowStore;
 
     private WindowStore<Integer, String> createWindowStore(final ProcessorContext context, final boolean retainDuplicates) {
         final WindowStore<Integer, String> store = Stores.windowStoreBuilder(
-            Stores.persistentWindowStore(windowName, ofMillis(retentionPeriod), ofMillis(windowSize), retainDuplicates),
+            Stores.persistentWindowStore(
+                windowName,
+                ofMillis(retentionPeriod),
+                ofMillis(windowSize),
+                retainDuplicates),
             Serdes.Integer(),
             Serdes.String()).build();
 
@@ -179,12 +188,12 @@ public class RocksDBWindowStoreTest {
         assertEquals("four", windowStore.fetch(4, startTime + 4L));
         assertEquals("five", windowStore.fetch(5, startTime + 5L));
 
-        assertEquals(asList("zero"), toList(windowStore.fetch(0, ofEpochMilli(startTime + 0L - windowSize), ofEpochMilli(startTime + 0L + windowSize))));
-        assertEquals(asList("one"), toList(windowStore.fetch(1, ofEpochMilli(startTime + 1L - windowSize), ofEpochMilli(startTime + 1L + windowSize))));
-        assertEquals(asList("two"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 2L - windowSize), ofEpochMilli(startTime + 2L + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(3, ofEpochMilli(startTime + 3L - windowSize), ofEpochMilli(startTime + 3L + windowSize))));
-        assertEquals(asList("four"), toList(windowStore.fetch(4, ofEpochMilli(startTime + 4L - windowSize), ofEpochMilli(startTime + 4L + windowSize))));
-        assertEquals(asList("five"), toList(windowStore.fetch(5, ofEpochMilli(startTime + 5L - windowSize), ofEpochMilli(startTime + 5L + windowSize))));
+        assertEquals(
+            Collections.singletonList("zero"),
+            toList(windowStore.fetch(
+                0,
+                ofEpochMilli(startTime + 0 - windowSize),
+                ofEpochMilli(startTime + 0 + windowSize))));
 
         putSecondBatch(windowStore, startTime, context);
 
@@ -195,21 +204,96 @@ public class RocksDBWindowStoreTest {
         assertEquals("two+5", windowStore.fetch(2, startTime + 7L));
         assertEquals("two+6", windowStore.fetch(2, startTime + 8L));
 
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime - 2L - windowSize), ofEpochMilli(startTime - 2L + windowSize))));
-        assertEquals(asList("two"), toList(windowStore.fetch(2, ofEpochMilli(startTime - 1L - windowSize), ofEpochMilli(startTime - 1L + windowSize))));
-        assertEquals(asList("two", "two+1"), toList(windowStore.fetch(2, ofEpochMilli(startTime - windowSize), ofEpochMilli(startTime + windowSize))));
-        assertEquals(asList("two", "two+1", "two+2"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 1L - windowSize), ofEpochMilli(startTime + 1L + windowSize))));
-        assertEquals(asList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 2L - windowSize), ofEpochMilli(startTime + 2L + windowSize))));
-        assertEquals(asList("two", "two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 3L - windowSize), ofEpochMilli(startTime + 3L + windowSize))));
-        assertEquals(asList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 4L - windowSize), ofEpochMilli(startTime + 4L + windowSize))));
-        assertEquals(asList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 5L - windowSize), ofEpochMilli(startTime + 5L + windowSize))));
-        assertEquals(asList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 6L - windowSize), ofEpochMilli(startTime + 6L + windowSize))));
-        assertEquals(asList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 7L - windowSize), ofEpochMilli(startTime + 7L + windowSize))));
-        assertEquals(asList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 8L - windowSize), ofEpochMilli(startTime + 8L + windowSize))));
-        assertEquals(asList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 9L - windowSize), ofEpochMilli(startTime + 9L + windowSize))));
-        assertEquals(asList("two+5", "two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 10L - windowSize), ofEpochMilli(startTime + 10L + windowSize))));
-        assertEquals(asList("two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 11L - windowSize), ofEpochMilli(startTime + 11L + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime + 12L - windowSize), ofEpochMilli(startTime + 12L + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime - 2L - windowSize),
+                ofEpochMilli(startTime - 2L + windowSize))));
+        assertEquals(
+            Collections.singletonList("two"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime - 1L - windowSize),
+                ofEpochMilli(startTime - 1L + windowSize))));
+        assertEquals(
+            asList("two", "two+1"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime - windowSize),
+                ofEpochMilli(startTime + windowSize))));
+        assertEquals(
+            asList("two", "two+1", "two+2"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 1L - windowSize),
+                ofEpochMilli(startTime + 1L + windowSize))));
+        assertEquals(
+            asList("two", "two+1", "two+2", "two+3"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 2L - windowSize),
+                ofEpochMilli(startTime + 2L + windowSize))));
+        assertEquals(
+            asList("two", "two+1", "two+2", "two+3", "two+4"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 3L - windowSize),
+                ofEpochMilli(startTime + 3L + windowSize))));
+        assertEquals(
+            asList("two", "two+1", "two+2", "two+3", "two+4", "two+5"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 4L - windowSize),
+                ofEpochMilli(startTime + 4L + windowSize))));
+        assertEquals(
+            asList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 5L - windowSize),
+                ofEpochMilli(startTime + 5L + windowSize))));
+        assertEquals(
+            asList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 6L - windowSize),
+                ofEpochMilli(startTime + 6L + windowSize))));
+        assertEquals(
+            asList("two+2", "two+3", "two+4", "two+5", "two+6"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 7L - windowSize),
+                ofEpochMilli(startTime + 7L + windowSize))));
+        assertEquals(
+            asList("two+3", "two+4", "two+5", "two+6"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 8L - windowSize),
+                ofEpochMilli(startTime + 8L + windowSize))));
+        assertEquals(
+            asList("two+4", "two+5", "two+6"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 9L - windowSize),
+                ofEpochMilli(startTime + 9L + windowSize))));
+        assertEquals(
+            asList("two+5", "two+6"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 10L - windowSize),
+                ofEpochMilli(startTime + 10L + windowSize))));
+        assertEquals(
+            Collections.singletonList("two+6"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 11L - windowSize),
+                ofEpochMilli(startTime + 11L + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 12L - windowSize),
+                ofEpochMilli(startTime + 12L + windowSize))));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -261,12 +345,10 @@ public class RocksDBWindowStoreTest {
             asList(one, two, four),
             StreamsTestUtils.toList(windowStore.fetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 4)))
         );
-
         assertEquals(
             asList(zero, one, two),
             StreamsTestUtils.toList(windowStore.fetchAll(ofEpochMilli(startTime + 0), ofEpochMilli(startTime + 3)))
         );
-
         assertEquals(
             asList(one, two, four, five),
             StreamsTestUtils.toList(windowStore.fetchAll(ofEpochMilli(startTime + 1), ofEpochMilli(startTime + 5)))
@@ -288,36 +370,67 @@ public class RocksDBWindowStoreTest {
 
         assertEquals(
             asList(zero, one),
-            StreamsTestUtils.toList(windowStore.fetch(0, 1, ofEpochMilli(startTime + 0L - windowSize), ofEpochMilli(startTime + 0L + windowSize)))
+            StreamsTestUtils.toList(windowStore.fetch(
+                0,
+                1,
+                ofEpochMilli(startTime + 0L - windowSize),
+                ofEpochMilli(startTime + 0L + windowSize)))
         );
         assertEquals(
-            asList(one),
-            StreamsTestUtils.toList(windowStore.fetch(1, 1, ofEpochMilli(startTime + 0L - windowSize), ofEpochMilli(startTime + 0L + windowSize)))
+            Collections.singletonList(one),
+            StreamsTestUtils.toList(windowStore.fetch(
+                1,
+                1,
+                ofEpochMilli(startTime + 0L - windowSize),
+                ofEpochMilli(startTime + 0L + windowSize)))
         );
         assertEquals(
             asList(one, two),
-            StreamsTestUtils.toList(windowStore.fetch(1, 3, ofEpochMilli(startTime + 0L - windowSize), ofEpochMilli(startTime + 0L + windowSize)))
+            StreamsTestUtils.toList(windowStore.fetch(
+                1,
+                3,
+                ofEpochMilli(startTime + 0L - windowSize),
+                ofEpochMilli(startTime + 0L + windowSize)))
         );
         assertEquals(
             asList(zero, one, two),
-            StreamsTestUtils.toList(windowStore.fetch(0, 5, ofEpochMilli(startTime + 0L - windowSize), ofEpochMilli(startTime + 0L + windowSize)))
+            StreamsTestUtils.toList(windowStore.fetch(
+                0,
+                5,
+                ofEpochMilli(startTime + 0L - windowSize),
+                ofEpochMilli(startTime + 0L + windowSize)))
         );
         assertEquals(
-            asList(zero, one, two,
-                four, five),
-            StreamsTestUtils.toList(windowStore.fetch(0, 5, ofEpochMilli(startTime + 0L - windowSize), ofEpochMilli(startTime + 0L + windowSize + 5L)))
+            asList(zero, one, two, four, five),
+            StreamsTestUtils.toList(windowStore.fetch(
+                0,
+                5,
+                ofEpochMilli(startTime + 0L - windowSize),
+                ofEpochMilli(startTime + 0L + windowSize + 5L)))
         );
         assertEquals(
             asList(two, four, five),
-            StreamsTestUtils.toList(windowStore.fetch(0, 5, ofEpochMilli(startTime + 2L), ofEpochMilli(startTime + 0L + windowSize + 5L)))
+            StreamsTestUtils.toList(windowStore.fetch(
+                0,
+                5,
+                ofEpochMilli(startTime + 2L),
+                ofEpochMilli(startTime + 0L + windowSize + 5L)))
         );
         assertEquals(
-            asList(),
-            StreamsTestUtils.toList(windowStore.fetch(4, 5, ofEpochMilli(startTime + 2L), ofEpochMilli(startTime + windowSize)))
+            Collections.emptyList(),
+            StreamsTestUtils.toList(windowStore.fetch(
+                4,
+                5,
+                ofEpochMilli(startTime + 2L),
+                ofEpochMilli(startTime + windowSize)))
         );
         assertEquals(
-            asList(),
-            StreamsTestUtils.toList(windowStore.fetch(0, 3, ofEpochMilli(startTime + 3L), ofEpochMilli(startTime + windowSize + 5)))
+            Collections.emptyList(),
+            StreamsTestUtils.toList(windowStore.fetch(
+                0,
+                3,
+                ofEpochMilli(startTime + 3L),
+                ofEpochMilli(startTime + windowSize + 5)))
         );
     }
 
@@ -328,39 +441,145 @@ public class RocksDBWindowStoreTest {
 
         putFirstBatch(windowStore, startTime, context);
 
-        assertEquals(asList("zero"), toList(windowStore.fetch(0, ofEpochMilli(startTime + 0L - windowSize), ofEpochMilli(startTime + 0L))));
-        assertEquals(asList("one"), toList(windowStore.fetch(1, ofEpochMilli(startTime + 1L - windowSize), ofEpochMilli(startTime + 1L))));
-        assertEquals(asList("two"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 2L - windowSize), ofEpochMilli(startTime + 2L))));
-        assertEquals(asList(), toList(windowStore.fetch(3, ofEpochMilli(startTime + 3L - windowSize), ofEpochMilli(startTime + 3L))));
-        assertEquals(asList("four"), toList(windowStore.fetch(4, ofEpochMilli(startTime + 4L - windowSize), ofEpochMilli(startTime + 4L))));
-        assertEquals(asList("five"), toList(windowStore.fetch(5, ofEpochMilli(startTime + 5L - windowSize), ofEpochMilli(startTime + 5L))));
+        assertEquals(
+            Collections.singletonList("zero"),
+            toList(windowStore.fetch(
+                0,
+                ofEpochMilli(startTime + 0L - windowSize),
+                ofEpochMilli(startTime + 0L))));
+        assertEquals(
+            Collections.singletonList("one"),
+            toList(windowStore.fetch(
+                1,
+                ofEpochMilli(startTime + 1L - windowSize),
+                ofEpochMilli(startTime + 1L))));
+        assertEquals(
+            Collections.singletonList("two"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 2L - windowSize),
+                ofEpochMilli(startTime + 2L))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                3,
+                ofEpochMilli(startTime + 3L - windowSize),
+                ofEpochMilli(startTime + 3L))));
+        assertEquals(
+            Collections.singletonList("four"),
+            toList(windowStore.fetch(
+                4,
+                ofEpochMilli(startTime + 4L - windowSize),
+                ofEpochMilli(startTime + 4L))));
+        assertEquals(
+            Collections.singletonList("five"),
+            toList(windowStore.fetch(
+                5,
+                ofEpochMilli(startTime + 5L - windowSize),
+                ofEpochMilli(startTime + 5L))));
 
         putSecondBatch(windowStore, startTime, context);
 
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime - 1L - windowSize), ofEpochMilli(startTime - 1L))));
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime + 0L - windowSize), ofEpochMilli(startTime + 0L))));
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime + 1L - windowSize), ofEpochMilli(startTime + 1L))));
-        assertEquals(asList("two"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 2L - windowSize), ofEpochMilli(startTime + 2L))));
-        assertEquals(asList("two", "two+1"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 3L - windowSize), ofEpochMilli(startTime + 3L))));
-        assertEquals(asList("two", "two+1", "two+2"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 4L - windowSize), ofEpochMilli(startTime + 4L))));
-        assertEquals(asList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 5L - windowSize), ofEpochMilli(startTime + 5L))));
-        assertEquals(asList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 6L - windowSize), ofEpochMilli(startTime + 6L))));
-        assertEquals(asList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 7L - windowSize), ofEpochMilli(startTime + 7L))));
-        assertEquals(asList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 8L - windowSize), ofEpochMilli(startTime + 8L))));
-        assertEquals(asList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 9L - windowSize), ofEpochMilli(startTime + 9L))));
-        assertEquals(asList("two+5", "two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 10L - windowSize), ofEpochMilli(startTime + 10L))));
-        assertEquals(asList("two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 11L - windowSize), ofEpochMilli(startTime + 11L))));
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime + 12L - windowSize), ofEpochMilli(startTime + 12L))));
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime + 13L - windowSize), ofEpochMilli(startTime + 13L))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime - 1L - windowSize),
+                ofEpochMilli(startTime - 1L))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 0L - windowSize),
+                ofEpochMilli(startTime + 0L))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 1L - windowSize),
+                ofEpochMilli(startTime + 1L))));
+        assertEquals(
+            Collections.singletonList("two"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 2L - windowSize),
+                ofEpochMilli(startTime + 2L))));
+        assertEquals(
+            asList("two", "two+1"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 3L - windowSize),
+                ofEpochMilli(startTime + 3L))));
+        assertEquals(
+            asList("two", "two+1", "two+2"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 4L - windowSize),
+                ofEpochMilli(startTime + 4L))));
+        assertEquals(
+            asList("two", "two+1", "two+2", "two+3"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 5L - windowSize),
+                ofEpochMilli(startTime + 5L))));
+        assertEquals(
+            asList("two+1", "two+2", "two+3", "two+4"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 6L - windowSize),
+                ofEpochMilli(startTime + 6L))));
+        assertEquals(
+            asList("two+2", "two+3", "two+4", "two+5"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 7L - windowSize),
+                ofEpochMilli(startTime + 7L))));
+        assertEquals(
+            asList("two+3", "two+4", "two+5", "two+6"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 8L - windowSize),
+                ofEpochMilli(startTime + 8L))));
+        assertEquals(
+            asList("two+4", "two+5", "two+6"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 9L - windowSize),
+                ofEpochMilli(startTime + 9L))));
+        assertEquals(
+            asList("two+5", "two+6"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 10L - windowSize),
+                ofEpochMilli(startTime + 10L))));
+        assertEquals(
+            Collections.singletonList("two+6"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 11L - windowSize),
+                ofEpochMilli(startTime + 11L))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 12L - windowSize),
+                ofEpochMilli(startTime + 12L))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + 13L - windowSize),
+                ofEpochMilli(startTime + 13L))));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
 
         final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
         assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
         assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
-        assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+        assertEquals(
+            Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"),
+            entriesByKey.get(2));
         assertNull(entriesByKey.get(3));
         assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
         assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
@@ -374,30 +593,72 @@ public class RocksDBWindowStoreTest {
 
         putFirstBatch(windowStore, startTime, context);
 
-        assertEquals(asList("zero"), toList(windowStore.fetch(0, ofEpochMilli(startTime + 0L), ofEpochMilli(startTime + 0L + windowSize))));
-        assertEquals(asList("one"), toList(windowStore.fetch(1, ofEpochMilli(startTime + 1L), ofEpochMilli(startTime + 1L + windowSize))));
-        assertEquals(asList("two"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 2L), ofEpochMilli(startTime + 2L + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(3, ofEpochMilli(startTime + 3L), ofEpochMilli(startTime + 3L + windowSize))));
-        assertEquals(asList("four"), toList(windowStore.fetch(4, ofEpochMilli(startTime + 4L), ofEpochMilli(startTime + 4L + windowSize))));
-        assertEquals(asList("five"), toList(windowStore.fetch(5, ofEpochMilli(startTime + 5L), ofEpochMilli(startTime + 5L + windowSize))));
+        assertEquals(
+            Collections.singletonList("zero"),
+            toList(windowStore.fetch(0, ofEpochMilli(startTime + 0L), ofEpochMilli(startTime + 0L + windowSize))));
+        assertEquals(
+            Collections.singletonList("one"),
+            toList(windowStore.fetch(1, ofEpochMilli(startTime + 1L), ofEpochMilli(startTime + 1L + windowSize))));
+        assertEquals(
+            Collections.singletonList("two"),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime + 2L), ofEpochMilli(startTime + 2L + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(3, ofEpochMilli(startTime + 3L), ofEpochMilli(startTime + 3L + windowSize))));
+        assertEquals(
+            Collections.singletonList("four"),
+            toList(windowStore.fetch(4, ofEpochMilli(startTime + 4L), ofEpochMilli(startTime + 4L + windowSize))));
+        assertEquals(
+            Collections.singletonList("five"),
+            toList(windowStore.fetch(5, ofEpochMilli(startTime + 5L), ofEpochMilli(startTime + 5L + windowSize))));
 
         putSecondBatch(windowStore, startTime, context);
 
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime - 2L), ofEpochMilli(startTime - 2L + windowSize))));
-        assertEquals(asList("two"), toList(windowStore.fetch(2, ofEpochMilli(startTime - 1L), ofEpochMilli(startTime - 1L + windowSize))));
-        assertEquals(asList("two", "two+1"), toList(windowStore.fetch(2, ofEpochMilli(startTime), ofEpochMilli(startTime + windowSize))));
-        assertEquals(asList("two", "two+1", "two+2"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 1L), ofEpochMilli(startTime + 1L + windowSize))));
-        assertEquals(asList("two", "two+1", "two+2", "two+3"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 2L), ofEpochMilli(startTime + 2L + windowSize))));
-        assertEquals(asList("two+1", "two+2", "two+3", "two+4"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 3L), ofEpochMilli(startTime + 3L + windowSize))));
-        assertEquals(asList("two+2", "two+3", "two+4", "two+5"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 4L), ofEpochMilli(startTime + 4L + windowSize))));
-        assertEquals(asList("two+3", "two+4", "two+5", "two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 5L), ofEpochMilli(startTime + 5L + windowSize))));
-        assertEquals(asList("two+4", "two+5", "two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 6L), ofEpochMilli(startTime + 6L + windowSize))));
-        assertEquals(asList("two+5", "two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 7L), ofEpochMilli(startTime + 7L + windowSize))));
-        assertEquals(asList("two+6"), toList(windowStore.fetch(2, ofEpochMilli(startTime + 8L), ofEpochMilli(startTime + 8L + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime + 9L), ofEpochMilli(startTime + 9L + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime + 10L), ofEpochMilli(startTime + 10L + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime + 11L), ofEpochMilli(startTime + 11L + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime + 12L), ofEpochMilli(startTime + 12L + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime - 2L), ofEpochMilli(startTime - 2L + windowSize))));
+        assertEquals(
+            Collections.singletonList("two"),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime - 1L), ofEpochMilli(startTime - 1L + windowSize))));
+        assertEquals(
+            asList("two", "two+1"),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime), ofEpochMilli(startTime + windowSize))));
+        assertEquals(
+            asList("two", "two+1", "two+2"),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime + 1L), ofEpochMilli(startTime + 1L + windowSize))));
+        assertEquals(
+            asList("two", "two+1", "two+2", "two+3"),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime + 2L), ofEpochMilli(startTime + 2L + windowSize))));
+        assertEquals(
+            asList("two+1", "two+2", "two+3", "two+4"),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime + 3L), ofEpochMilli(startTime + 3L + windowSize))));
+        assertEquals(
+            asList("two+2", "two+3", "two+4", "two+5"),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime + 4L), ofEpochMilli(startTime + 4L + windowSize))));
+        assertEquals(
+            asList("two+3", "two+4", "two+5", "two+6"),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime + 5L), ofEpochMilli(startTime + 5L + windowSize))));
+        assertEquals(
+            asList("two+4", "two+5", "two+6"),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime + 6L), ofEpochMilli(startTime + 6L + windowSize))));
+        assertEquals(
+            asList("two+5", "two+6"),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime + 7L), ofEpochMilli(startTime + 7L + windowSize))));
+        assertEquals(
+            Collections.singletonList("two+6"),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime + 8L), ofEpochMilli(startTime + 8L + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime + 9L), ofEpochMilli(startTime + 9L + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime + 10L), ofEpochMilli(startTime + 10L + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime + 11L), ofEpochMilli(startTime + 11L + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(2, ofEpochMilli(startTime + 12L), ofEpochMilli(startTime + 12L + windowSize))));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -406,7 +667,9 @@ public class RocksDBWindowStoreTest {
 
         assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
         assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
-        assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+        assertEquals(
+            Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"),
+            entriesByKey.get(2));
         assertNull(entriesByKey.get(3));
         assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
         assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
@@ -421,17 +684,44 @@ public class RocksDBWindowStoreTest {
         setCurrentTime(startTime);
         windowStore.put(0, "zero");
 
-        assertEquals(asList("zero"), toList(windowStore.fetch(0, ofEpochMilli(startTime - windowSize), ofEpochMilli(startTime + windowSize))));
+        assertEquals(
+            Collections.singletonList("zero"),
+            toList(windowStore.fetch(0, ofEpochMilli(startTime - windowSize), ofEpochMilli(startTime + windowSize))));
 
         windowStore.put(0, "zero");
         windowStore.put(0, "zero+");
         windowStore.put(0, "zero++");
 
-        assertEquals(asList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, ofEpochMilli(startTime - windowSize), ofEpochMilli(startTime + windowSize))));
-        assertEquals(asList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, ofEpochMilli(startTime + 1L - windowSize), ofEpochMilli(startTime + 1L + windowSize))));
-        assertEquals(asList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, ofEpochMilli(startTime + 2L - windowSize), ofEpochMilli(startTime + 2L + windowSize))));
-        assertEquals(asList("zero", "zero", "zero+", "zero++"), toList(windowStore.fetch(0, ofEpochMilli(startTime + 3L - windowSize), ofEpochMilli(startTime + 3L + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(0, ofEpochMilli(startTime + 4L - windowSize), ofEpochMilli(startTime + 4L + windowSize))));
+        assertEquals(
+            asList("zero", "zero", "zero+", "zero++"),
+            toList(windowStore.fetch(
+                0,
+                ofEpochMilli(startTime - windowSize),
+                ofEpochMilli(startTime + windowSize))));
+        assertEquals(
+            asList("zero", "zero", "zero+", "zero++"),
+            toList(windowStore.fetch(
+                0,
+                ofEpochMilli(startTime + 1L - windowSize),
+                ofEpochMilli(startTime + 1L + windowSize))));
+        assertEquals(
+            asList("zero", "zero", "zero+", "zero++"),
+            toList(windowStore.fetch(
+                0,
+                ofEpochMilli(startTime + 2L - windowSize),
+                ofEpochMilli(startTime + 2L + windowSize))));
+        assertEquals(
+            asList("zero", "zero", "zero+", "zero++"),
+            toList(windowStore.fetch(
+                0,
+                ofEpochMilli(startTime + 3L - windowSize),
+                ofEpochMilli(startTime + 3L + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                0,
+                ofEpochMilli(startTime + 4L - windowSize),
+                ofEpochMilli(startTime + 4L + windowSize))));
 
         // Flush the store and verify all current entries were properly flushed ...
         windowStore.flush();
@@ -477,7 +767,6 @@ public class RocksDBWindowStoreTest {
             segmentDirs(baseDir)
         );
 
-
         setCurrentTime(startTime + increment * 5);
         windowStore.put(5, "five");
         assertEquals(
@@ -489,12 +778,42 @@ public class RocksDBWindowStoreTest {
             segmentDirs(baseDir)
         );
 
-        assertEquals(asList("zero"), toList(windowStore.fetch(0, ofEpochMilli(startTime - windowSize), ofEpochMilli(startTime + windowSize))));
-        assertEquals(asList("one"), toList(windowStore.fetch(1, ofEpochMilli(startTime + increment - windowSize), ofEpochMilli(startTime + increment + windowSize))));
-        assertEquals(asList("two"), toList(windowStore.fetch(2, ofEpochMilli(startTime + increment * 2 - windowSize), ofEpochMilli(startTime + increment * 2 + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(3, ofEpochMilli(startTime + increment * 3 - windowSize), ofEpochMilli(startTime + increment * 3 + windowSize))));
-        assertEquals(asList("four"), toList(windowStore.fetch(4, ofEpochMilli(startTime + increment * 4 - windowSize), ofEpochMilli(startTime + increment * 4 + windowSize))));
-        assertEquals(asList("five"), toList(windowStore.fetch(5, ofEpochMilli(startTime + increment * 5 - windowSize), ofEpochMilli(startTime + increment * 5 + windowSize))));
+        assertEquals(
+            Collections.singletonList("zero"),
+            toList(windowStore.fetch(
+                0,
+                ofEpochMilli(startTime - windowSize),
+                ofEpochMilli(startTime + windowSize))));
+        assertEquals(
+            Collections.singletonList("one"),
+            toList(windowStore.fetch(
+                1,
+                ofEpochMilli(startTime + increment - windowSize),
+                ofEpochMilli(startTime + increment + windowSize))));
+        assertEquals(
+            Collections.singletonList("two"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + increment * 2 - windowSize),
+                ofEpochMilli(startTime + increment * 2 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                3,
+                ofEpochMilli(startTime + increment * 3 - windowSize),
+                ofEpochMilli(startTime + increment * 3 + windowSize))));
+        assertEquals(
+            Collections.singletonList("four"),
+            toList(windowStore.fetch(
+                4,
+                ofEpochMilli(startTime + increment * 4 - windowSize),
+                ofEpochMilli(startTime + increment * 4 + windowSize))));
+        assertEquals(
+            Collections.singletonList("five"),
+            toList(windowStore.fetch(
+                5,
+                ofEpochMilli(startTime + increment * 5 - windowSize),
+                ofEpochMilli(startTime + increment * 5 + windowSize))));
 
         setCurrentTime(startTime + increment * 6);
         windowStore.put(6, "six");
@@ -507,15 +826,47 @@ public class RocksDBWindowStoreTest {
             segmentDirs(baseDir)
         );
 
-
-        assertEquals(asList(), toList(windowStore.fetch(0, ofEpochMilli(startTime - windowSize), ofEpochMilli(startTime + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(1, ofEpochMilli(startTime + increment - windowSize), ofEpochMilli(startTime + increment + windowSize))));
-        assertEquals(asList("two"), toList(windowStore.fetch(2, ofEpochMilli(startTime + increment * 2 - windowSize), ofEpochMilli(startTime + increment * 2 + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(3, ofEpochMilli(startTime + increment * 3 - windowSize), ofEpochMilli(startTime + increment * 3 + windowSize))));
-        assertEquals(asList("four"), toList(windowStore.fetch(4, ofEpochMilli(startTime + increment * 4 - windowSize), ofEpochMilli(startTime + increment * 4 + windowSize))));
-        assertEquals(asList("five"), toList(windowStore.fetch(5, ofEpochMilli(startTime + increment * 5 - windowSize), ofEpochMilli(startTime + increment * 5 + windowSize))));
-        assertEquals(asList("six"), toList(windowStore.fetch(6, ofEpochMilli(startTime + increment * 6 - windowSize), ofEpochMilli(startTime + increment * 6 + windowSize))));
-
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                0,
+                ofEpochMilli(startTime - windowSize),
+                ofEpochMilli(startTime + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                1,
+                ofEpochMilli(startTime + increment - windowSize),
+                ofEpochMilli(startTime + increment + windowSize))));
+        assertEquals(
+            Collections.singletonList("two"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + increment * 2 - windowSize),
+                ofEpochMilli(startTime + increment * 2 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                3,
+                ofEpochMilli(startTime + increment * 3 - windowSize),
+                ofEpochMilli(startTime + increment * 3 + windowSize))));
+        assertEquals(Collections.singletonList("four"),
+            toList(windowStore.fetch(
+                4,
+                ofEpochMilli(startTime + increment * 4 - windowSize),
+                ofEpochMilli(startTime + increment * 4 + windowSize))));
+        assertEquals(
+            Collections.singletonList("five"),
+            toList(windowStore.fetch(
+                5,
+                ofEpochMilli(startTime + increment * 5 - windowSize),
+                ofEpochMilli(startTime + increment * 5 + windowSize))));
+        assertEquals(
+            Collections.singletonList("six"),
+            toList(windowStore.fetch(
+                6,
+                ofEpochMilli(startTime + increment * 6 - windowSize),
+                ofEpochMilli(startTime + increment * 6 + windowSize))));
 
         setCurrentTime(startTime + increment * 7);
         windowStore.put(7, "seven");
@@ -528,14 +879,54 @@ public class RocksDBWindowStoreTest {
             segmentDirs(baseDir)
         );
 
-        assertEquals(asList(), toList(windowStore.fetch(0, ofEpochMilli(startTime - windowSize), ofEpochMilli(startTime + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(1, ofEpochMilli(startTime + increment - windowSize), ofEpochMilli(startTime + increment + windowSize))));
-        assertEquals(asList("two"), toList(windowStore.fetch(2, ofEpochMilli(startTime + increment * 2 - windowSize), ofEpochMilli(startTime + increment * 2 + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(3, ofEpochMilli(startTime + increment * 3 - windowSize), ofEpochMilli(startTime + increment * 3 + windowSize))));
-        assertEquals(asList("four"), toList(windowStore.fetch(4, ofEpochMilli(startTime + increment * 4 - windowSize), ofEpochMilli(startTime + increment * 4 + windowSize))));
-        assertEquals(asList("five"), toList(windowStore.fetch(5, ofEpochMilli(startTime + increment * 5 - windowSize), ofEpochMilli(startTime + increment * 5 + windowSize))));
-        assertEquals(asList("six"), toList(windowStore.fetch(6, ofEpochMilli(startTime + increment * 6 - windowSize), ofEpochMilli(startTime + increment * 6 + windowSize))));
-        assertEquals(asList("seven"), toList(windowStore.fetch(7, ofEpochMilli(startTime + increment * 7 - windowSize), ofEpochMilli(startTime + increment * 7 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                0,
+                ofEpochMilli(startTime - windowSize),
+                ofEpochMilli(startTime + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                1,
+                ofEpochMilli(startTime + increment - windowSize),
+                ofEpochMilli(startTime + increment + windowSize))));
+        assertEquals(
+            Collections.singletonList("two"),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + increment * 2 - windowSize),
+                ofEpochMilli(startTime + increment * 2 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                3,
+                ofEpochMilli(startTime + increment * 3 - windowSize),
+                ofEpochMilli(startTime + increment * 3 + windowSize))));
+        assertEquals(
+            Collections.singletonList("four"),
+            toList(windowStore.fetch(
+                4,
+                ofEpochMilli(startTime + increment * 4 - windowSize),
+                ofEpochMilli(startTime + increment * 4 + windowSize))));
+        assertEquals(
+            Collections.singletonList("five"),
+            toList(windowStore.fetch(
+                5,
+                ofEpochMilli(startTime + increment * 5 - windowSize),
+                ofEpochMilli(startTime + increment * 5 + windowSize))));
+        assertEquals(
+            Collections.singletonList("six"),
+            toList(windowStore.fetch(
+                6,
+                ofEpochMilli(startTime + increment * 6 - windowSize),
+                ofEpochMilli(startTime + increment * 6 + windowSize))));
+        assertEquals(
+            Collections.singletonList("seven"),
+            toList(windowStore.fetch(
+                7,
+                ofEpochMilli(startTime + increment * 7 - windowSize),
+                ofEpochMilli(startTime + increment * 7 + windowSize))));
 
         setCurrentTime(startTime + increment * 8);
         windowStore.put(8, "eight");
@@ -548,16 +939,60 @@ public class RocksDBWindowStoreTest {
             segmentDirs(baseDir)
         );
 
-
-        assertEquals(asList(), toList(windowStore.fetch(0, ofEpochMilli(startTime - windowSize), ofEpochMilli(startTime + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(1, ofEpochMilli(startTime + increment - windowSize), ofEpochMilli(startTime + increment + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime + increment * 2 - windowSize), ofEpochMilli(startTime + increment * 2 + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(3, ofEpochMilli(startTime + increment * 3 - windowSize), ofEpochMilli(startTime + increment * 3 + windowSize))));
-        assertEquals(asList("four"), toList(windowStore.fetch(4, ofEpochMilli(startTime + increment * 4 - windowSize), ofEpochMilli(startTime + increment * 4 + windowSize))));
-        assertEquals(asList("five"), toList(windowStore.fetch(5, ofEpochMilli(startTime + increment * 5 - windowSize), ofEpochMilli(startTime + increment * 5 + windowSize))));
-        assertEquals(asList("six"), toList(windowStore.fetch(6, ofEpochMilli(startTime + increment * 6 - windowSize), ofEpochMilli(startTime + increment * 6 + windowSize))));
-        assertEquals(asList("seven"), toList(windowStore.fetch(7, ofEpochMilli(startTime + increment * 7 - windowSize), ofEpochMilli(startTime + increment * 7 + windowSize))));
-        assertEquals(asList("eight"), toList(windowStore.fetch(8, ofEpochMilli(startTime + increment * 8 - windowSize), ofEpochMilli(startTime + increment * 8 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                0,
+                ofEpochMilli(startTime - windowSize),
+                ofEpochMilli(startTime + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                1,
+                ofEpochMilli(startTime + increment - windowSize),
+                ofEpochMilli(startTime + increment + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + increment * 2 - windowSize),
+                ofEpochMilli(startTime + increment * 2 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                3,
+                ofEpochMilli(startTime + increment * 3 - windowSize),
+                ofEpochMilli(startTime + increment * 3 + windowSize))));
+        assertEquals(
+            Collections.singletonList("four"),
+            toList(windowStore.fetch(
+                4,
+                ofEpochMilli(startTime + increment * 4 - windowSize),
+                ofEpochMilli(startTime + increment * 4 + windowSize))));
+        assertEquals(
+            Collections.singletonList("five"),
+            toList(windowStore.fetch(
+                5,
+                ofEpochMilli(startTime + increment * 5 - windowSize),
+                ofEpochMilli(startTime + increment * 5 + windowSize))));
+        assertEquals(
+            Collections.singletonList("six"),
+            toList(windowStore.fetch(
+                6,
+                ofEpochMilli(startTime + increment * 6 - windowSize),
+                ofEpochMilli(startTime + increment * 6 + windowSize))));
+        assertEquals(
+            Collections.singletonList("seven"),
+            toList(windowStore.fetch(
+                7,
+                ofEpochMilli(startTime + increment * 7 - windowSize),
+                ofEpochMilli(startTime + increment * 7 + windowSize))));
+        assertEquals(
+            Collections.singletonList("eight"),
+            toList(windowStore.fetch(
+                8,
+                ofEpochMilli(startTime + increment * 8 - windowSize),
+                ofEpochMilli(startTime + increment * 8 + windowSize))));
 
         // check segment directories
         windowStore.flush();
@@ -574,7 +1009,7 @@ public class RocksDBWindowStoreTest {
     }
 
     @Test
-    public void testRestore() throws IOException {
+    public void testRestore() throws Exception {
         final long startTime = segmentInterval * 2;
         final long increment = segmentInterval / 2;
 
@@ -605,32 +1040,125 @@ public class RocksDBWindowStoreTest {
         Utils.delete(baseDir);
 
         windowStore = createWindowStore(context, false);
-        assertEquals(asList(), toList(windowStore.fetch(0, ofEpochMilli(startTime - windowSize), ofEpochMilli(startTime + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(1, ofEpochMilli(startTime + increment - windowSize), ofEpochMilli(startTime + increment + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime + increment * 2 - windowSize), ofEpochMilli(startTime + increment * 2 + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(3, ofEpochMilli(startTime + increment * 3 - windowSize), ofEpochMilli(startTime + increment * 3 + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(4, ofEpochMilli(startTime + increment * 4 - windowSize), ofEpochMilli(startTime + increment * 4 + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(5, ofEpochMilli(startTime + increment * 5 - windowSize), ofEpochMilli(startTime + increment * 5 + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(6, ofEpochMilli(startTime + increment * 6 - windowSize), ofEpochMilli(startTime + increment * 6 + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(7, ofEpochMilli(startTime + increment * 7 - windowSize), ofEpochMilli(startTime + increment * 7 + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(8, ofEpochMilli(startTime + increment * 8 - windowSize), ofEpochMilli(startTime + increment * 8 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                0,
+                ofEpochMilli(startTime - windowSize),
+                ofEpochMilli(startTime + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                1,
+                ofEpochMilli(startTime + increment - windowSize),
+                ofEpochMilli(startTime + increment + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + increment * 2 - windowSize),
+                ofEpochMilli(startTime + increment * 2 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                3,
+                ofEpochMilli(startTime + increment * 3 - windowSize),
+                ofEpochMilli(startTime + increment * 3 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                4,
+                ofEpochMilli(startTime + increment * 4 - windowSize),
+                ofEpochMilli(startTime + increment * 4 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                5,
+                ofEpochMilli(startTime + increment * 5 - windowSize),
+                ofEpochMilli(startTime + increment * 5 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                6,
+                ofEpochMilli(startTime + increment * 6 - windowSize),
+                ofEpochMilli(startTime + increment * 6 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                7,
+                ofEpochMilli(startTime + increment * 7 - windowSize),
+                ofEpochMilli(startTime + increment * 7 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                8,
+                ofEpochMilli(startTime + increment * 8 - windowSize),
+                ofEpochMilli(startTime + increment * 8 + windowSize))));
 
         context.restore(windowName, changeLog);
 
-        assertEquals(asList(), toList(windowStore.fetch(0, ofEpochMilli(startTime - windowSize), ofEpochMilli(startTime + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(1, ofEpochMilli(startTime + increment - windowSize), ofEpochMilli(startTime + increment + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(2, ofEpochMilli(startTime + increment * 2 - windowSize), ofEpochMilli(startTime + increment * 2 + windowSize))));
-        assertEquals(asList(), toList(windowStore.fetch(3, ofEpochMilli(startTime + increment * 3 - windowSize), ofEpochMilli(startTime + increment * 3 + windowSize))));
-        assertEquals(asList("four"), toList(windowStore.fetch(4, ofEpochMilli(startTime + increment * 4 - windowSize), ofEpochMilli(startTime + increment * 4 + windowSize))));
-        assertEquals(asList("five"), toList(windowStore.fetch(5, ofEpochMilli(startTime + increment * 5 - windowSize), ofEpochMilli(startTime + increment * 5 + windowSize))));
-        assertEquals(asList("six"), toList(windowStore.fetch(6, ofEpochMilli(startTime + increment * 6 - windowSize), ofEpochMilli(startTime + increment * 6 + windowSize))));
-        assertEquals(asList("seven"), toList(windowStore.fetch(7, ofEpochMilli(startTime + increment * 7 - windowSize), ofEpochMilli(startTime + increment * 7 + windowSize))));
-        assertEquals(asList("eight"), toList(windowStore.fetch(8, ofEpochMilli(startTime + increment * 8 - windowSize), ofEpochMilli(startTime + increment * 8 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                0,
+                ofEpochMilli(startTime - windowSize),
+                ofEpochMilli(startTime + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                1,
+                ofEpochMilli(startTime + increment - windowSize),
+                ofEpochMilli(startTime + increment + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                2,
+                ofEpochMilli(startTime + increment * 2 - windowSize),
+                ofEpochMilli(startTime + increment * 2 + windowSize))));
+        assertEquals(
+            Collections.emptyList(),
+            toList(windowStore.fetch(
+                3,
+                ofEpochMilli(startTime + increment * 3 - windowSize),
+                ofEpochMilli(startTime + increment * 3 + windowSize))));
+        assertEquals(
+            Collections.singletonList("four"),
+            toList(windowStore.fetch(
+                4,
+                ofEpochMilli(startTime + increment * 4 - windowSize),
+                ofEpochMilli(startTime + increment * 4 + windowSize))));
+        assertEquals(
+            Collections.singletonList("five"),
+            toList(windowStore.fetch(
+                5,
+                ofEpochMilli(startTime + increment * 5 - windowSize),
+                ofEpochMilli(startTime + increment * 5 + windowSize))));
+        assertEquals(
+            Collections.singletonList("six"),
+            toList(windowStore.fetch(
+                6,
+                ofEpochMilli(startTime + increment * 6 - windowSize),
+                ofEpochMilli(startTime + increment * 6 + windowSize))));
+        assertEquals(
+            Collections.singletonList("seven"),
+            toList(windowStore.fetch(
+                7,
+                ofEpochMilli(startTime + increment * 7 - windowSize),
+                ofEpochMilli(startTime + increment * 7 + windowSize))));
+        assertEquals(
+            Collections.singletonList("eight"),
+            toList(windowStore.fetch(
+                8,
+                ofEpochMilli(startTime + increment * 8 - windowSize),
+                ofEpochMilli(startTime + increment * 8 + windowSize))));
 
         // check segment directories
         windowStore.flush();
         assertEquals(
-            Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
+            Utils.mkSet(
+                segments.segmentName(4L),
+                segments.segmentName(5L),
+                segments.segmentName(6L)),
             segmentDirs(baseDir)
         );
     }
@@ -730,7 +1258,10 @@ public class RocksDBWindowStoreTest {
         context.setStreamTime(segmentInterval * 6L);
         windowStore = createWindowStore(context, false);
 
-        final List<String> expected = asList(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L));
+        final List<String> expected = asList(
+            segments.segmentName(4L),
+            segments.segmentName(5L),
+            segments.segmentName(6L));
         expected.sort(String::compareTo);
 
         final List<String> actual = Utils.toList(segmentDirs(baseDir).iterator());
@@ -745,7 +1276,10 @@ public class RocksDBWindowStoreTest {
         }
 
         assertEquals(
-            Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
+            Utils.mkSet(
+                segments.segmentName(4L),
+                segments.segmentName(5L),
+                segments.segmentName(6L)),
             segmentDirs(baseDir)
         );
     }
@@ -836,7 +1370,12 @@ public class RocksDBWindowStoreTest {
     @Test
     public void shouldNoNullPointerWhenSerdeDoesNotHandleNull() {
         windowStore = new RocksDBWindowStore<>(
-            new RocksDBSegmentedBytesStore(windowName, "metrics-scope", retentionPeriod, segmentInterval, new WindowKeySchema()),
+            new RocksDBSegmentedBytesStore(
+                windowName,
+                "metrics-scope",
+                retentionPeriod,
+                segmentInterval,
+                new WindowKeySchema()),
             Serdes.Integer(),
             new SerdeThatDoesntHandleNull(),
             false,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
index 8045964..db5f083 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
 import org.apache.kafka.test.TestUtils;
@@ -32,6 +31,7 @@ import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 import static org.junit.Assert.assertEquals;
@@ -42,31 +42,27 @@ public class SegmentIteratorTest {
 
     private final Segment segmentOne = new Segment("one", "one", 0);
     private final Segment segmentTwo = new Segment("two", "window", 1);
-    private final HasNextCondition hasNextCondition = new HasNextCondition() {
-        @Override
-        public boolean hasNext(final KeyValueIterator iterator) {
-            return iterator.hasNext();
-        }
-    };
+    private final HasNextCondition hasNextCondition = Iterator::hasNext;
 
-    private InternalMockProcessorContext context;
     private SegmentIterator iterator = null;
 
     @Before
     public void before() {
-        context = new InternalMockProcessorContext(
-                TestUtils.tempDirectory(),
-                Serdes.String(),
-                Serdes.String(),
-                new NoOpRecordCollector(),
-                new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
+        final InternalMockProcessorContext context = new InternalMockProcessorContext(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.String(),
+            new NoOpRecordCollector(),
+            new ThreadCache(
+                new LogContext("testCache "),
+                0,
+                new MockStreamsMetrics(new Metrics())));
         segmentOne.openDB(context);
         segmentTwo.openDB(context);
         segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes());
         segmentOne.put(Bytes.wrap("b".getBytes()), "2".getBytes());
         segmentTwo.put(Bytes.wrap("c".getBytes()), "3".getBytes());
         segmentTwo.put(Bytes.wrap("d".getBytes()), "4".getBytes());
-
     }
 
     @After
@@ -81,10 +77,11 @@ public class SegmentIteratorTest {
 
     @Test
     public void shouldIterateOverAllSegments() {
-        iterator = new SegmentIterator(Arrays.asList(segmentOne, segmentTwo).iterator(),
-                hasNextCondition,
-                Bytes.wrap("a".getBytes()),
-                Bytes.wrap("z".getBytes()));
+        iterator = new SegmentIterator(
+            Arrays.asList(segmentOne, segmentTwo).iterator(),
+            hasNextCondition,
+            Bytes.wrap("a".getBytes()),
+            Bytes.wrap("z".getBytes()));
 
         assertTrue(iterator.hasNext());
         assertEquals("a", new String(iterator.peekNextKey().get()));
@@ -107,11 +104,11 @@ public class SegmentIteratorTest {
 
     @Test
     public void shouldNotThrowExceptionOnHasNextWhenStoreClosed() {
-        iterator = new SegmentIterator(Collections.singletonList(segmentOne).iterator(),
-                                       hasNextCondition,
-                                       Bytes.wrap("a".getBytes()),
-                                       Bytes.wrap("z".getBytes()));
-
+        iterator = new SegmentIterator(
+            Collections.singletonList(segmentOne).iterator(),
+            hasNextCondition,
+            Bytes.wrap("a".getBytes()),
+            Bytes.wrap("z".getBytes()));
 
         iterator.currentIterator = segmentOne.all();
         segmentOne.close();
@@ -120,10 +117,11 @@ public class SegmentIteratorTest {
 
     @Test
     public void shouldOnlyIterateOverSegmentsInRange() {
-        iterator = new SegmentIterator(Arrays.asList(segmentOne, segmentTwo).iterator(),
-                hasNextCondition,
-                Bytes.wrap("a".getBytes()),
-                Bytes.wrap("b".getBytes()));
+        iterator = new SegmentIterator(
+            Arrays.asList(segmentOne, segmentTwo).iterator(),
+            hasNextCondition,
+            Bytes.wrap("a".getBytes()),
+            Bytes.wrap("b".getBytes()));
 
         assertTrue(iterator.hasNext());
         assertEquals("a", new String(iterator.peekNextKey().get()));
@@ -138,20 +136,22 @@ public class SegmentIteratorTest {
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowNoSuchElementOnPeekNextKeyIfNoNext() {
-        iterator = new SegmentIterator(Arrays.asList(segmentOne, segmentTwo).iterator(),
-                hasNextCondition,
-                Bytes.wrap("f".getBytes()),
-                Bytes.wrap("h".getBytes()));
+        iterator = new SegmentIterator(
+            Arrays.asList(segmentOne, segmentTwo).iterator(),
+            hasNextCondition,
+            Bytes.wrap("f".getBytes()),
+            Bytes.wrap("h".getBytes()));
 
         iterator.peekNextKey();
     }
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowNoSuchElementOnNextIfNoNext() {
-        iterator = new SegmentIterator(Arrays.asList(segmentOne, segmentTwo).iterator(),
-                hasNextCondition,
-                Bytes.wrap("f".getBytes()),
-                Bytes.wrap("h".getBytes()));
+        iterator = new SegmentIterator(
+            Arrays.asList(segmentOne, segmentTwo).iterator(),
+            hasNextCondition,
+            Bytes.wrap("f".getBytes()),
+            Bytes.wrap("h".getBytes()));
 
         iterator.next();
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 7186c28..3e9b3c3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -42,8 +42,14 @@ public class StoreChangeLoggerTest {
     private final Map<Integer, String> logged = new HashMap<>();
     private final Map<Integer, Headers> loggedHeaders = new HashMap<>();
 
-    private final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
-        new RecordCollectorImpl("StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")) {
+    private final InternalMockProcessorContext context = new InternalMockProcessorContext(
+        StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
+        new RecordCollectorImpl(
+            "StoreChangeLoggerTest",
+            new LogContext("StoreChangeLoggerTest "),
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records")) {
+
             @Override
             public <K1, V1> void send(final String topic,
                                       final K1 key,
@@ -71,7 +77,8 @@ public class StoreChangeLoggerTest {
         }
     );
 
-    private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class));
+    private final StoreChangeLogger<Integer, String> changeLogger =
+        new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class));
 
     @Test
     public void testAddRemove() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 9679429..a10b62d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -75,10 +75,19 @@ public class StreamThreadStateStoreProviderTest {
         final TopologyWrapper topology = new TopologyWrapper();
         topology.addSource("the-source", topicName);
         topology.addProcessor("the-processor", new MockProcessorSupplier(), "the-source");
-        topology.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv-store"), Serdes.String(), Serdes.String()), "the-processor");
+        topology.addStateStore(
+            Stores.keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("kv-store"),
+                Serdes.String(),
+                Serdes.String()),
+            "the-processor");
         topology.addStateStore(
             Stores.windowStoreBuilder(
-                Stores.persistentWindowStore("window-store", Duration.ofMillis(10L), Duration.ofMillis(2L), false),
+                Stores.persistentWindowStore(
+                    "window-store",
+                    Duration.ofMillis(10L),
+                    Duration.ofMillis(2L),
+                    false),
                 Serdes.String(),
                 Serdes.String()),
             "the-processor"
@@ -101,11 +110,19 @@ public class StreamThreadStateStoreProviderTest {
         tasks = new HashMap<>();
         stateDirectory = new StateDirectory(streamsConfig, new MockTime(), true);
 
-        taskOne = createStreamsTask(streamsConfig, clientSupplier, processorTopology, new TaskId(0, 0));
+        taskOne = createStreamsTask(
+            streamsConfig,
+            clientSupplier,
+            processorTopology,
+            new TaskId(0, 0));
         taskOne.initializeStateStores();
         tasks.put(new TaskId(0, 0), taskOne);
 
-        final StreamTask taskTwo = createStreamsTask(streamsConfig, clientSupplier, processorTopology, new TaskId(0, 1));
+        final StreamTask taskTwo = createStreamsTask(
+            streamsConfig,
+            clientSupplier,
+            processorTopology,
+            new TaskId(0, 1));
         taskTwo.initializeStateStores();
         tasks.put(new TaskId(0, 1), taskTwo);
 
@@ -130,8 +147,7 @@ public class StreamThreadStateStoreProviderTest {
     @Test
     public void shouldFindWindowStores() {
         mockThread(true);
-        final List<ReadOnlyWindowStore<Object, Object>>
-            windowStores =
+        final List<ReadOnlyWindowStore<Object, Object>> windowStores =
             provider.stores("window-store", windowStore());
         assertEquals(2, windowStores.size());
     }
@@ -153,11 +169,11 @@ public class StreamThreadStateStoreProviderTest {
     @Test
     public void shouldReturnEmptyListIfNoStoresFoundWithName() {
         mockThread(true);
-        assertEquals(Collections.emptyList(), provider.stores("not-a-store", QueryableStoreTypes
-            .keyValueStore()));
+        assertEquals(
+            Collections.emptyList(),
+            provider.stores("not-a-store", QueryableStoreTypes.keyValueStore()));
     }
 
-
     @Test
     public void shouldReturnEmptyListIfStoreExistsButIsNotOfTypeValueStore() {
         mockThread(true);
@@ -183,7 +199,11 @@ public class StreamThreadStateStoreProviderTest {
             Collections.singletonList(new TopicPartition(topicName, taskId.partition)),
             topology,
             clientSupplier.consumer,
-            new StoreChangelogReader(clientSupplier.restoreConsumer, Duration.ZERO, new MockStateRestoreListener(), new LogContext("test-stream-task ")),
+            new StoreChangelogReader(
+                clientSupplier.restoreConsumer,
+                Duration.ZERO,
+                new MockStateRestoreListener(),
+                new LogContext("test-stream-task ")),
             streamsConfig,
             new MockStreamsMetrics(metrics),
             stateDirectory,
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
index 66b20b2..6fa2257 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
@@ -17,16 +17,15 @@
 
 package org.apache.kafka.streams.tests;
 
-import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
@@ -36,6 +35,7 @@ import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -44,7 +44,8 @@ public class StreamsStandByReplicaTest {
 
     public static void main(final String[] args) throws IOException {
         if (args.length < 2) {
-            System.err.println("StreamsStandByReplicaTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
+            System.err.println("StreamsStandByReplicaTest are expecting two parameters: " +
+                "propFile, additionalConfigs; but only see " + args.length + " parameter");
             System.exit(1);
         }
 
@@ -83,7 +84,11 @@ public class StreamsStandByReplicaTest {
         final String sinkTopic2 = updated.remove("sinkTopic2");
 
         if (sourceTopic == null || sinkTopic1 == null || sinkTopic2 == null) {
-            System.err.println(String.format("one or more required topics null sourceTopic[%s], sinkTopic1[%s], sinkTopic2[%s]", sourceTopic, sinkTopic1, sinkTopic2));
+            System.err.println(String.format(
+                "one or more required topics null sourceTopic[%s], sinkTopic1[%s], sinkTopic2[%s]",
+                sourceTopic,
+                sinkTopic1,
+                sinkTopic2));
             System.err.flush();
             System.exit(1);
         }
@@ -109,42 +114,32 @@ public class StreamsStandByReplicaTest {
         final KeyValueBytesStoreSupplier persistentStoreSupplier = Stores.persistentKeyValueStore(persistentMemoryStoreName);
 
         final Serde<String> stringSerde = Serdes.String();
+        final ValueMapper<Long, String> countMapper = Object::toString;
 
         final KStream<String, String> inputStream = builder.stream(sourceTopic, Consumed.with(stringSerde, stringSerde));
 
-        final ValueMapper<Long, String> countMapper = new ValueMapper<Long, String>() {
-            @Override
-            public String apply(final Long value) {
-                return value.toString();
-            }
-        };
-
-        inputStream.groupByKey().count(Materialized.<String, Long>as(inMemoryStoreSupplier)).toStream().mapValues(countMapper)
+        inputStream.groupByKey().count(Materialized.as(inMemoryStoreSupplier)).toStream().mapValues(countMapper)
             .to(sinkTopic1, Produced.with(stringSerde, stringSerde));
 
-        inputStream.groupByKey().count(Materialized.<String, Long>as(persistentStoreSupplier)).toStream().mapValues(countMapper)
+        inputStream.groupByKey().count(Materialized.as(persistentStoreSupplier)).toStream().mapValues(countMapper)
             .to(sinkTopic2, Produced.with(stringSerde, stringSerde));
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), streamsProperties);
 
-        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(final Thread t, final Throwable e) {
-                System.err.println("FATAL: An unexpected exception " + e);
-                e.printStackTrace(System.err);
-                System.err.flush();
-                shutdown(streams);
-            }
+        streams.setUncaughtExceptionHandler((t, e) -> {
+            System.err.println("FATAL: An unexpected exception " + e);
+            e.printStackTrace(System.err);
+            System.err.flush();
+            shutdown(streams);
         });
 
-        streams.setStateListener(new KafkaStreams.StateListener() {
-            @Override
-            public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) {
-                if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
-                    final Set<ThreadMetadata> threadMetadata = streams.localThreadsMetadata();
-                    for (final ThreadMetadata threadMetadatum : threadMetadata) {
-                        System.out.println("ACTIVE_TASKS:" + threadMetadatum.activeTasks().size() + " STANDBY_TASKS:" + threadMetadatum.standbyTasks().size());
-                    }
+        streams.setStateListener((newState, oldState) -> {
+            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
+                final Set<ThreadMetadata> threadMetadata = streams.localThreadsMetadata();
+                for (final ThreadMetadata threadMetadatum : threadMetadata) {
+                    System.out.println(
+                        "ACTIVE_TASKS:" + threadMetadatum.activeTasks().size()
+                        + " STANDBY_TASKS:" + threadMetadatum.standbyTasks().size());
                 }
             }
         });
@@ -152,15 +147,10 @@ public class StreamsStandByReplicaTest {
         System.out.println("Start Kafka Streams");
         streams.start();
 
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                shutdown(streams);
-                System.out.println("Shut down streams now");
-            }
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            shutdown(streams);
+            System.out.println("Shut down streams now");
         }));
-
-
     }
 
     private static void shutdown(final KafkaStreams streams) {
diff --git a/streams/src/test/java/org/apache/kafka/test/KTableValueGetterStub.java b/streams/src/test/java/org/apache/kafka/test/KTableValueGetterStub.java
deleted file mode 100644
index 75f6db0..0000000
--- a/streams/src/test/java/org/apache/kafka/test/KTableValueGetterStub.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class KTableValueGetterStub<K, V> implements KTableValueGetter<K, V> {
-
-    private final Map<K, V> data = new HashMap<>();
-
-    @Override
-    public void init(final ProcessorContext context) {
-
-    }
-
-    @Override
-    public V get(final K key) {
-        return data.get(key);
-    }
-
-    public void put(final K key, final V value) {
-        data.put(key, value);
-    }
-
-    public void remove(final K key) {
-        data.remove(key);
-    }
-
-    @Override
-    public void close() {
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockAggregator.java b/streams/src/test/java/org/apache/kafka/test/MockAggregator.java
index 829e011..42416ec 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockAggregator.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockAggregator.java
@@ -24,11 +24,6 @@ public class MockAggregator {
     public final static Aggregator<Object, Object, String> TOSTRING_REMOVER = toStringInstance("-");
 
     public static <K, V> Aggregator<K, V, String> toStringInstance(final String sep) {
-        return new Aggregator<K, V, String>() {
-            @Override
-            public String apply(final K aggKey, final V value, final String aggregate) {
-                return aggregate + sep + value;
-            }
-        };
+        return (aggKey, value, aggregate) -> aggregate + sep + value;
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInitializer.java b/streams/src/test/java/org/apache/kafka/test/MockInitializer.java
index 61987e7..d5d69c8 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInitializer.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInitializer.java
@@ -21,7 +21,6 @@ import org.apache.kafka.streams.kstream.Initializer;
 public class MockInitializer {
 
     private static class StringInit implements Initializer<String> {
-
         @Override
         public String apply() {
             return "0";
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
index e13e144..b3050f4 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessor.java
@@ -16,17 +16,17 @@
  */
 package org.apache.kafka.test;
 
-import java.time.Duration;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
 
+import java.time.Duration;
 import java.util.ArrayList;
 
 import static org.junit.Assert.assertEquals;
 
+@SuppressWarnings("WeakerAccess")
 public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
 
     public final ArrayList<String> processed = new ArrayList<>();
@@ -43,7 +43,8 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
 
     private boolean commitRequested = false;
 
-    public MockProcessor(final PunctuationType punctuationType, final long scheduleInterval) {
+    public MockProcessor(final PunctuationType punctuationType,
+                         final long scheduleInterval) {
         this.punctuationType = punctuationType;
         this.scheduleInterval = scheduleInterval;
     }
@@ -56,9 +57,10 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
     public void init(final ProcessorContext context) {
         super.init(context);
         if (scheduleInterval > 0L) {
-            scheduleCancellable = context.schedule(Duration.ofMillis(scheduleInterval), punctuationType, new Punctuator() {
-                @Override
-                public void punctuate(final long timestamp) {
+            scheduleCancellable = context.schedule(
+                Duration.ofMillis(scheduleInterval),
+                punctuationType,
+                timestamp -> {
                     if (punctuationType == PunctuationType.STREAM_TIME) {
                         assertEquals(timestamp, context().timestamp());
                     }
@@ -67,8 +69,7 @@ public class MockProcessor<K, V> extends AbstractProcessor<K, V> {
 
                     (punctuationType == PunctuationType.STREAM_TIME ? punctuatedStreamTime : punctuatedSystemTime)
                             .add(timestamp);
-                }
-            });
+                });
         }
     }
 
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 432b9f8..e8f4fd6 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -184,8 +184,6 @@ public class TopologyTestDriver implements Closeable {
     private final GlobalStateUpdateTask globalStateTask;
     private final GlobalStateManager globalStateManager;
 
-    private final InternalProcessorContext context;
-
     private final StateDirectory stateDirectory;
     private final Metrics metrics;
     final ProcessorTopology processorTopology;
@@ -353,11 +351,14 @@ public class TopologyTestDriver implements Closeable {
                 metrics.sensor("dummy"));
             task.initializeStateStores();
             task.initializeTopology();
-            context = (InternalProcessorContext) task.context();
-            context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, ProcessorContextImpl.NONEXIST_TOPIC, new RecordHeaders()));
+            ((InternalProcessorContext) task.context()).setRecordContext(new ProcessorRecordContext(
+                0L,
+                -1L,
+                -1,
+                ProcessorContextImpl.NONEXIST_TOPIC,
+                new RecordHeaders()));
         } else {
             task = null;
-            context = null;
         }
         eosEnabled = streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
     }
@@ -691,10 +692,6 @@ public class TopologyTestDriver implements Closeable {
         stateDirectory.clean();
     }
 
-    private Producer<byte[], byte[]> get() {
-        return producer;
-    }
-
     static class MockTime implements Time {
         private final AtomicLong timeMs;
         private final AtomicLong highResTimeNs;
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 9f1c3a7..84aaa8a 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams;
 
-import java.io.File;
-import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.Header;
@@ -57,6 +55,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.io.File;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -118,8 +118,6 @@ public class TopologyTestDriverTest {
         new StringSerializer(),
         new LongSerializer());
 
-    private final boolean eosEnabled;
-
     @Parameterized.Parameters(name = "Eos enabled = {0}")
     public static Collection<Object[]> data() {
         final List<Object[]> values = new ArrayList<>();
@@ -130,7 +128,6 @@ public class TopologyTestDriverTest {
     }
 
     public TopologyTestDriverTest(final boolean eosEnabled) {
-        this.eosEnabled = eosEnabled;
         if (eosEnabled) {
             config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
         }


Mime
View raw message