kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: code cleanup (#6053)
Date Wed, 09 Jan 2019 17:03:32 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1c7bf4e  MINOR: code cleanup (#6053)
1c7bf4e is described below

commit 1c7bf4e4976e2b58826f68f1abe8ffc9fd41692c
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Wed Jan 9 18:03:16 2019 +0100

    MINOR: code cleanup (#6053)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>, Guozhang Wang <guozhang@confluent.io>
---
 gradle/dependencies.gradle                         |   4 +-
 .../org/apache/kafka/streams/StreamsBuilder.java   |   8 +-
 .../kafka/streams/kstream/KGroupedStream.java      |   1 -
 .../streams/kstream/internals/AbstractStream.java  |  53 +++---
 .../internals/ForwardingCacheFlushListener.java    |   4 +-
 .../kstream/internals/KTableKTableInnerJoin.java   |   7 +-
 .../internals/KeyValueStoreMaterializer.java       |   7 +-
 .../internals/graph/StreamStreamJoinNode.java      |  22 +--
 .../internals/graph/TableProcessorNode.java        |   6 +-
 .../kstream/internals/graph/TableSourceNode.java   |  14 +-
 .../internals/InternalTopologyBuilder.java         |  69 ++++----
 .../processor/internals/ProcessorContextImpl.java  | 183 ++++++++++++++-------
 .../processor/internals/ProcessorStateManager.java |  19 ++-
 .../internals/assignment/SubscriptionInfo.java     |   6 +-
 .../kafka/streams/state/QueryableStoreTypes.java   |  40 +++--
 .../apache/kafka/streams/state/WindowStore.java    |  11 +-
 .../state/internals/CachingKeyValueStore.java      |  27 +--
 .../internals/ChangeLoggingKeyValueBytesStore.java |  21 +--
 .../internals/InMemoryKeyValueLoggedStore.java     | 133 ---------------
 .../state/internals/InMemoryKeyValueStore.java     |   4 -
 .../state/internals/KeyValueStoreBuilder.java      |   1 +
 .../streams/state/internals/MemoryLRUCache.java    |  27 ++-
 .../state/internals/MeteredKeyValueStore.java      |  32 ++--
 .../state/internals/MeteredSessionStore.java       |  64 +++----
 .../state/internals/MeteredWindowStore.java        |  62 ++++---
 .../state/internals/QueryableStoreProvider.java    |   8 +-
 .../streams/state/internals/RocksDBStore.java      |   9 +-
 .../streams/state/internals/StoreChangeLogger.java |  12 +-
 .../state/internals/WindowStoreBuilder.java        |  23 +--
 .../state/internals/WrappingStoreProvider.java     |   8 +-
 .../assignment/StickyTaskAssignorTest.java         |   4 +-
 31 files changed, 435 insertions(+), 454 deletions(-)

diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 2c2488f..c7faec5 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -57,7 +57,7 @@ versions += [
   jetty: "9.4.12.v20180830",
   jersey: "2.27",
   jmh: "1.21",
-  hamcrest: "1.3",
+  hamcrest: "2.1",
   log4j: "1.2.17",
   scalaLogging: "3.9.0",
   jaxb: "2.3.0",
@@ -119,7 +119,7 @@ libs += [
   jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
   joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
   junit: "junit:junit:$versions.junit",
-  hamcrest: "org.hamcrest:hamcrest-all:1.3",
+  hamcrest: "org.hamcrest:hamcrest:$versions.hamcrest",
   kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
   kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
   kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 14b63e2..442c87c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -113,7 +113,7 @@ public class StreamsBuilder {
      * @return a {@link KStream} for the specified topics
      */
     public synchronized <K, V> KStream<K, V> stream(final Collection<String> topics) {
-        return stream(topics, Consumed.<K, V>with(null, null, null, null));
+        return stream(topics, Consumed.with(null, null, null, null));
     }
 
     /**
@@ -155,7 +155,7 @@ public class StreamsBuilder {
      * @return a {@link KStream} for topics matching the regex pattern.
      */
     public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern) {
-        return stream(topicPattern, Consumed.<K, V>with(null, null));
+        return stream(topicPattern, Consumed.with(null, null));
     }
 
     /**
@@ -250,7 +250,7 @@ public class StreamsBuilder {
      * @return a {@link KTable} for the specified topic
      */
     public synchronized <K, V> KTable<K, V> table(final String topic) {
-        return table(topic, new ConsumedInternal<K, V>());
+        return table(topic, new ConsumedInternal<>());
     }
 
     /**
@@ -356,7 +356,7 @@ public class StreamsBuilder {
      * @return a {@link GlobalKTable} for the specified topic
      */
     public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic) {
-        return globalTable(topic, Consumed.<K, V>with(null, null));
+        return globalTable(topic, Consumed.with(null, null));
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index 7b69e03..05e4ac9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -115,7 +115,6 @@ public interface KGroupedStream<K, V> {
      */
     KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);
 
-
     /**
      * Combine the values of records in this stream by the grouped key.
      * Records with {@code null} key or value are ignored.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index e870751..f087b79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -95,48 +95,35 @@ public abstract class AbstractStream<K, V> {
     }
 
     static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
-        return new ValueJoiner<T2, T1, R>() {
-            @Override
-            public R apply(final T2 value2, final T1 value1) {
-                return joiner.apply(value1, value2);
-            }
-        };
+        return (value2, value1) -> joiner.apply(value1, value2);
     }
 
     static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final ValueMapper<V, VR> valueMapper) {
         Objects.requireNonNull(valueMapper, "valueMapper can't be null");
-        return new ValueMapperWithKey<K, V, VR>() {
-            @Override
-            public VR apply(final K readOnlyKey, final V value) {
-                return valueMapper.apply(value);
-            }
-        };
+        return (readOnlyKey, value) -> valueMapper.apply(value);
     }
 
     static <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> toValueTransformerWithKeySupplier(
         final ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
-        return new ValueTransformerWithKeySupplier<K, V, VR>() {
-            @Override
-            public ValueTransformerWithKey<K, V, VR> get() {
-                final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
-                return new ValueTransformerWithKey<K, V, VR>() {
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        valueTransformer.init(context);
-                    }
-
-                    @Override
-                    public VR transform(final K readOnlyKey, final V value) {
-                        return valueTransformer.transform(value);
-                    }
-
-                    @Override
-                    public void close() {
-                        valueTransformer.close();
-                    }
-                };
-            }
+        return () -> {
+            final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
+            return new ValueTransformerWithKey<K, V, VR>() {
+                @Override
+                public void init(final ProcessorContext context) {
+                    valueTransformer.init(context);
+                }
+
+                @Override
+                public VR transform(final K readOnlyKey, final V value) {
+                    return valueTransformer.transform(value);
+                }
+
+                @Override
+                public void close() {
+                    valueTransformer.close();
+                }
+            };
         };
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
index 4065ced..eee61fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ForwardingCacheFlushListener.java
@@ -30,7 +30,9 @@ class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, V> {
     }
 
     @Override
-    public void apply(final K key, final V newValue, final V oldValue) {
+    public void apply(final K key,
+                      final V newValue,
+                      final V oldValue) {
         final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index a7bbb56..58110b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -28,12 +28,7 @@ import org.slf4j.LoggerFactory;
 class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
     private static final Logger LOG = LoggerFactory.getLogger(KTableKTableInnerJoin.class);
 
-    private final KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
-        @Override
-        public K apply(final K key, final V1 value) {
-            return key;
-        }
-    };
+    private final KeyValueMapper<K, V1, K> keyValueMapper = (key, value) -> key;
 
     KTableKTableInnerJoin(final KTableImpl<K, ?, V1> table1,
                           final KTableImpl<K, ?, V2> table2,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index c8cd35d..67872be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -38,9 +38,10 @@ public class KeyValueStoreMaterializer<K, V> {
             final String name = materialized.storeName();
             supplier = Stores.persistentKeyValueStore(name);
         }
-        final StoreBuilder<KeyValueStore<K, V>> builder = Stores.keyValueStoreBuilder(supplier,
-                                                                                      materialized.keySerde(),
-                                                                                      materialized.valueSerde());
+        final StoreBuilder<KeyValueStore<K, V>> builder = Stores.keyValueStoreBuilder(
+            supplier,
+            materialized.keySerde(),
+            materialized.valueSerde());
 
         if (materialized.loggingEnabled()) {
             builder.withLoggingEnabled(materialized.logConfig());
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
index 46cf3e7..d080c18 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamStreamJoinNode.java
@@ -35,16 +35,16 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
     private final Joined<K, V1, V2> joined;
 
 
-    StreamStreamJoinNode(final String nodeName,
-                         final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner,
-                         final ProcessorParameters<K, V1> joinThisProcessorParameters,
-                         final ProcessorParameters<K, V2> joinOtherProcessParameters,
-                         final ProcessorParameters<K, VR> joinMergeProcessorParameters,
-                         final ProcessorParameters<K, V1> thisWindowedStreamProcessorParameters,
-                         final ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters,
-                         final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder,
-                         final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder,
-                         final Joined<K, V1, V2> joined) {
+    private StreamStreamJoinNode(final String nodeName,
+                                 final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner,
+                                 final ProcessorParameters<K, V1> joinThisProcessorParameters,
+                                 final ProcessorParameters<K, V2> joinOtherProcessParameters,
+                                 final ProcessorParameters<K, VR> joinMergeProcessorParameters,
+                                 final ProcessorParameters<K, V1> thisWindowedStreamProcessorParameters,
+                                 final ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters,
+                                 final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder,
+                                 final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder,
+                                 final Joined<K, V1, V2> joined) {
 
         super(nodeName,
               valueJoiner,
@@ -89,7 +89,7 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
         topologyBuilder.addStateStore(otherWindowStoreBuilder, otherWindowedStreamProcessorName, thisProcessorName);
     }
 
-    public static <K, V, V1, V2, VR> StreamStreamJoinNodeBuilder<K, V1, V2, VR> streamStreamJoinNodeBuilder() {
+    public static <K, V1, V2, VR> StreamStreamJoinNodeBuilder<K, V1, V2, VR> streamStreamJoinNodeBuilder() {
         return new StreamStreamJoinNodeBuilder<>();
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
index eb3328a..0409c62 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNode.java
@@ -66,7 +66,11 @@ public class TableProcessorNode<K, V, S extends StateStore> extends StreamsGraph
         final boolean shouldMaterialize = materializedInternal != null && materializedInternal.queryableStoreName() != null;
         if (shouldMaterialize) {
             // TODO: we are enforcing this as a keyvalue store, but it should go beyond any type of stores
-            topologyBuilder.addStateStore(new KeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize(), processorName);
+            topologyBuilder.addStateStore(
+                new KeyValueStoreMaterializer<>(
+                    (MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal
+                ).materialize(),
+                processorName);
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
index 6c213a7..53061dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java
@@ -39,13 +39,13 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
     private final String sourceName;
     private final boolean isGlobalKTable;
 
-    TableSourceNode(final String nodeName,
-                    final String sourceName,
-                    final String topic,
-                    final ConsumedInternal<K, V> consumedInternal,
-                    final MaterializedInternal<K, V, ?> materializedInternal,
-                    final ProcessorParameters<K, V> processorParameters,
-                    final boolean isGlobalKTable) {
+    private TableSourceNode(final String nodeName,
+                            final String sourceName,
+                            final String topic,
+                            final ConsumedInternal<K, V> consumedInternal,
+                            final MaterializedInternal<K, V, ?> materializedInternal,
+                            final ProcessorParameters<K, V> processorParameters,
+                            final boolean isGlobalKTable) {
 
         super(nodeName,
               Collections.singletonList(topic),
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index b1f9496..0648fec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -50,13 +50,10 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.regex.Pattern;
 
-
 public class InternalTopologyBuilder {
 
     private static final Logger log = LoggerFactory.getLogger(InternalTopologyBuilder.class);
-
     private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
-
     private static final String[] NO_PREDECESSORS = {};
 
     // node factories in a topological order
@@ -706,13 +703,15 @@ public class InternalTopologyBuilder {
         // in the map; this scenario is possible, for example, that a state store underlying a source KTable is
         // connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic.
 
-        if (stateStoreNameToSourceTopics.containsKey(stateStoreName) || stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
+        if (stateStoreNameToSourceTopics.containsKey(stateStoreName)
+            || stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
             return;
         }
 
         final Set<String> sourceTopics = new HashSet<>();
         final Set<Pattern> sourcePatterns = new HashSet<>();
-        final Set<SourceNodeFactory> sourceNodesForPredecessor = findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
+        final Set<SourceNodeFactory> sourceNodesForPredecessor =
+            findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
 
         for (final SourceNodeFactory sourceNodeFactory : sourceNodesForPredecessor) {
             if (sourceNodeFactory.pattern != null) {
@@ -1019,7 +1018,9 @@ public class InternalTopologyBuilder {
                         if (internalTopicNames.contains(topic)) {
                             // prefix the internal topic name with the application id
                             final String internalTopic = decorateTopic(topic);
-                            repartitionTopics.put(internalTopic, new RepartitionTopicConfig(internalTopic, Collections.emptyMap()));
+                            repartitionTopics.put(
+                                internalTopic,
+                                new RepartitionTopicConfig(internalTopic, Collections.emptyMap()));
                             sourceTopics.add(internalTopic);
                         } else {
                             sourceTopics.add(topic);
@@ -1038,14 +1039,16 @@ public class InternalTopologyBuilder {
                     }
                 }
 
-                // if the node is connected to a state store whose changelog topics are not predefined, add to the changelog topics
+                // if the node is connected to a state store whose changelog topics are not predefined,
+                // add to the changelog topics
                 for (final StateStoreFactory stateFactory : stateFactories.values()) {
                     if (stateFactory.loggingEnabled() && stateFactory.users().contains(node)) {
                         final String topicName = storeToChangelogTopic.containsKey(stateFactory.name()) ?
                                 storeToChangelogTopic.get(stateFactory.name()) :
                                 ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name());
                         if (!stateChangelogTopics.containsKey(topicName)) {
-                            final InternalTopicConfig internalTopicConfig = createChangelogTopicConfig(stateFactory, topicName);
+                            final InternalTopicConfig internalTopicConfig =
+                                createChangelogTopicConfig(stateFactory, topicName);
                             stateChangelogTopics.put(topicName, internalTopicConfig);
                         }
                     }
@@ -1066,7 +1069,8 @@ public class InternalTopologyBuilder {
     // Adjust the generated topology based on the configs.
     // Not exposed as public API and should be removed post 2.0
     private void adjust(final StreamsConfig config) {
-        final boolean enableOptimization20 = config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);
+        final boolean enableOptimization20 =
+            config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);
 
         if (enableOptimization20) {
             for (final Map.Entry<StoreBuilder, String> entry : storeToSourceChangelogTopic.entrySet()) {
@@ -1084,9 +1088,12 @@ public class InternalTopologyBuilder {
     private void setRegexMatchedTopicsToSourceNodes() {
         if (subscriptionUpdates.hasUpdates()) {
             for (final Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
-                final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
+                final SourceNodeFactory sourceNode =
+                    (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
                 //need to update nodeToSourceTopics with topics matched from given regex
-                nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
+                nodeToSourceTopics.put(
+                    stringPatternEntry.getKey(),
+                    sourceNode.getTopics(subscriptionUpdates.getUpdates()));
                 log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
             }
         }
@@ -1108,7 +1115,9 @@ public class InternalTopologyBuilder {
                     if (storeTopics != null) {
                         updatedTopicsForStateStore.addAll(storeTopics);
                     }
-                    stateStoreNameToSourceTopics.put(storePattern.getKey(), Collections.unmodifiableSet(updatedTopicsForStateStore));
+                    stateStoreNameToSourceTopics.put(
+                        storePattern.getKey(),
+                        Collections.unmodifiableSet(updatedTopicsForStateStore));
                 }
             }
         }
@@ -1205,11 +1214,11 @@ public class InternalTopologyBuilder {
         return applicationId + "-" + topic;
     }
 
-    public SubscriptionUpdates subscriptionUpdates() {
+    SubscriptionUpdates subscriptionUpdates() {
         return subscriptionUpdates;
     }
 
-    public synchronized Pattern sourceTopicPattern() {
+    synchronized Pattern sourceTopicPattern() {
         if (topicPattern == null) {
             final List<String> allSourceTopics = new ArrayList<>();
             if (!nodeToSourceTopics.isEmpty()) {
@@ -1263,7 +1272,9 @@ public class InternalTopologyBuilder {
         return description;
     }
 
-    private void describeGlobalStore(final TopologyDescription description, final Set<String> nodes, final int id) {
+    private void describeGlobalStore(final TopologyDescription description,
+                                     final Set<String> nodes,
+                                     final int id) {
         final Iterator<String> it = nodes.iterator();
         while (it.hasNext()) {
             final String node = it.next();
@@ -1314,7 +1325,8 @@ public class InternalTopologyBuilder {
 
     private final static NodeComparator NODE_COMPARATOR = new NodeComparator();
 
-    private static void updateSize(final AbstractNode node, final int delta) {
+    private static void updateSize(final AbstractNode node,
+                                   final int delta) {
         node.size += delta;
 
         for (final TopologyDescription.Node predecessor : node.predecessors()) {
@@ -1523,7 +1535,8 @@ public class InternalTopologyBuilder {
 
         @Override
         public String toString() {
-            return "Processor: " + name + " (stores: " + stores + ")\n      --> " + nodeNames(successors) + "\n      <-- " + nodeNames(predecessors);
+            return "Processor: " + name + " (stores: " + stores + ")\n      --> "
+                + nodeNames(successors) + "\n      <-- " + nodeNames(predecessors);
         }
 
         @Override
@@ -1592,7 +1605,8 @@ public class InternalTopologyBuilder {
             if (topicNameExtractor instanceof StaticTopicNameExtractor) {
                 return "Sink: " + name + " (topic: " + topic() + ")\n      <-- " + nodeNames(predecessors);
             }
-            return "Sink: " + name + " (extractor class: " + topicNameExtractor + ")\n      <-- " + nodeNames(predecessors);
+            return "Sink: " + name + " (extractor class: " + topicNameExtractor + ")\n      <-- "
+                + nodeNames(predecessors);
         }
 
         @Override
@@ -1678,8 +1692,8 @@ public class InternalTopologyBuilder {
     }
 
     public static class TopicsInfo {
-        public final Set<String> sinkTopics;
-        public final Set<String> sourceTopics;
+        final Set<String> sinkTopics;
+        final Set<String> sourceTopics;
         public final Map<String, InternalTopicConfig> stateChangelogTopics;
         public final Map<String, InternalTopicConfig> repartitionSourceTopics;
 
@@ -1775,10 +1789,8 @@ public class InternalTopologyBuilder {
             int globalStoresIndex = sortedGlobalStores.length - 1;
             while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
                 sb.append("  ");
-                final TopologyDescription.Subtopology subtopology =
-                    sortedSubtopologies[subtopologiesIndex];
-                final TopologyDescription.GlobalStore globalStore =
-                    sortedGlobalStores[globalStoresIndex];
+                final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex];
+                final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex];
                 if (subtopology.id() == expectedId) {
                     sb.append(subtopology);
                     subtopologiesIndex--;
@@ -1789,15 +1801,13 @@ public class InternalTopologyBuilder {
                 expectedId++;
             }
             while (subtopologiesIndex != -1) {
-                final TopologyDescription.Subtopology subtopology =
-                    sortedSubtopologies[subtopologiesIndex];
+                final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex];
                 sb.append("  ");
                 sb.append(subtopology);
                 subtopologiesIndex--;
             }
             while (globalStoresIndex != -1) {
-                final TopologyDescription.GlobalStore globalStore =
-                    sortedGlobalStores[globalStoresIndex];
+                final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex];
                 sb.append("  ");
                 sb.append(globalStore);
                 globalStoresIndex--;
@@ -1868,7 +1878,8 @@ public class InternalTopologyBuilder {
         }
     }
 
-    public void updateSubscribedTopics(final Set<String> topics, final String logPrefix) {
+    void updateSubscribedTopics(final Set<String> topics,
+                                final String logPrefix) {
         final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
         log.debug("{}found {} topics possibly matching regex", logPrefix, topics);
         // update the topic groups with the returned subscription set for regex pattern subscriptions
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index f3e5160..8076725 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -35,10 +35,10 @@ import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore;
 
 import java.time.Duration;
 import java.util.List;
-import org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore;
 
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 
@@ -96,12 +96,12 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
         if (!currentNode().stateStores.contains(name)) {
             throw new StreamsException("Processor " + currentNode().name() + " has no access to StateStore " + name +
-                    " as the store is not connected to the processor. If you add stores manually via '.addStateStore()' " +
-                    "make sure to connect the added store to the processor by providing the processor name to " +
-                    "'.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. " +
-                    "DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' " +
-                    "to connect the store to the corresponding operator. If you do not add stores manually, " +
-                    "please file a bug report at https://issues.apache.org/jira/projects/KAFKA.");
+                " as the store is not connected to the processor. If you add stores manually via '.addStateStore()' " +
+                "make sure to connect the added store to the processor by providing the processor name to " +
+                "'.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. " +
+                "DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' " +
+                "to connect the store to the corresponding operator. If you do not add stores manually, " +
+                "please file a bug report at https://issues.apache.org/jira/projects/KAFKA.");
         }
 
         final StateStore store = stateManager.getStore(name);
@@ -118,25 +118,35 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
     @SuppressWarnings("unchecked")
     @Override
-    public <K, V> void forward(final K key, final V value) {
+    public <K, V> void forward(final K key,
+                               final V value) {
         forward(key, value, SEND_TO_ALL);
     }
 
     @SuppressWarnings({"unchecked", "deprecation"})
     @Override
-    public <K, V> void forward(final K key, final V value, final int childIndex) {
-        forward(key, value, To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name()));
+    public <K, V> void forward(final K key,
+                               final V value,
+                               final int childIndex) {
+        forward(
+            key,
+            value,
+            To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name()));
     }
 
     @SuppressWarnings({"unchecked", "deprecation"})
     @Override
-    public <K, V> void forward(final K key, final V value, final String childName) {
+    public <K, V> void forward(final K key,
+                               final V value,
+                               final String childName) {
         forward(key, value, To.child(childName));
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public <K, V> void forward(final K key, final V value, final To to) {
+    public <K, V> void forward(final K key,
+                               final V value,
+                               final To to) {
         toInternal.update(to);
         if (toInternal.hasTimestamp()) {
             recordContext.setTimestamp(toInternal.timestamp());
@@ -148,8 +158,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
             if (sendTo != null) {
                 final ProcessorNode child = currentNode().getChild(sendTo);
                 if (child == null) {
-                    throw new StreamsException("Unknown downstream node: " + sendTo + " either does not exist or is not" +
-                            " connected to this processor.");
+                    throw new StreamsException("Unknown downstream node: " + sendTo
+                        + " either does not exist or is not connected to this processor.");
                 }
                 forward(child, key, value);
             } else {
@@ -182,7 +192,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
     @Override
     @Deprecated
-    public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
+    public Cancellable schedule(final long interval,
+                                final PunctuationType type,
+                                final Punctuator callback) {
         if (interval < 1) {
             throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond.");
         }
@@ -210,7 +222,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     private abstract static class StateStoreReadOnlyDecorator<T extends StateStore> extends AbstractStateStore {
         static final String ERROR_MESSAGE = "Global store is read only";
 
-        StateStoreReadOnlyDecorator(final T inner) {
+        private StateStoreReadOnlyDecorator(final T inner) {
             super(inner);
         }
 
@@ -225,7 +237,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         @Override
-        public void init(final ProcessorContext context, final StateStore root) {
+        public void init(final ProcessorContext context,
+                         final StateStore root) {
             throw new UnsupportedOperationException(ERROR_MESSAGE);
         }
 
@@ -235,8 +248,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
     }
 
-    private static class KeyValueStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>> implements KeyValueStore<K, V> {
-        KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) {
+    private static class KeyValueStoreReadOnlyDecorator<K, V>
+        extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>>
+        implements KeyValueStore<K, V> {
+
+        private KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) {
             super(inner);
         }
 
@@ -246,7 +262,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         @Override
-        public KeyValueIterator<K, V> range(final K from, final K to) {
+        public KeyValueIterator<K, V> range(final K from,
+                                            final K to) {
             return getInner().range(from, to);
         }
 
@@ -261,12 +278,14 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         @Override
-        public void put(final K key, final V value) {
+        public void put(final K key,
+                        final V value) {
             throw new UnsupportedOperationException(ERROR_MESSAGE);
         }
 
         @Override
-        public V putIfAbsent(final K key, final V value) {
+        public V putIfAbsent(final K key,
+                             final V value) {
             throw new UnsupportedOperationException(ERROR_MESSAGE);
         }
 
@@ -281,35 +300,47 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
     }
 
-    private static class WindowStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<WindowStore<K, V>> implements WindowStore<K, V> {
-        WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) {
+    private static class WindowStoreReadOnlyDecorator<K, V>
+        extends StateStoreReadOnlyDecorator<WindowStore<K, V>>
+        implements WindowStore<K, V> {
+
+        private WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) {
             super(inner);
         }
 
         @Override
-        public void put(final K key, final V value) {
+        public void put(final K key,
+                        final V value) {
             throw new UnsupportedOperationException(ERROR_MESSAGE);
         }
 
         @Override
-        public void put(final K key, final V value, final long windowStartTimestamp) {
+        public void put(final K key,
+                        final V value,
+                        final long windowStartTimestamp) {
             throw new UnsupportedOperationException(ERROR_MESSAGE);
         }
 
         @Override
-        public V fetch(final K key, final long time) {
+        public V fetch(final K key,
+                       final long time) {
             return getInner().fetch(key, time);
         }
 
         @Deprecated
         @Override
-        public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+        public WindowStoreIterator<V> fetch(final K key,
+                                            final long timeFrom,
+                                            final long timeTo) {
             return getInner().fetch(key, timeFrom, timeTo);
         }
 
         @Deprecated
         @Override
-        public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
+        public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+                                                      final K to,
+                                                      final long timeFrom,
+                                                      final long timeTo) {
             return getInner().fetch(from, to, timeFrom, timeTo);
         }
 
@@ -320,23 +351,32 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
         @Deprecated
         @Override
-        public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
+        public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+                                                         final long timeTo) {
             return getInner().fetchAll(timeFrom, timeTo);
         }
     }
 
-    private static class SessionStoreReadOnlyDecorator<K, AGG> extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>> implements SessionStore<K, AGG> {
-        SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) {
+    private static class SessionStoreReadOnlyDecorator<K, AGG>
+        extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>>
+        implements SessionStore<K, AGG> {
+
+        private SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) {
             super(inner);
         }
 
         @Override
-        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
+                                                               final long earliestSessionEndTime,
+                                                               final long latestSessionStartTime) {
             return getInner().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
         }
 
         @Override
-        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
+                                                               final K keyTo,
+                                                               final long earliestSessionEndTime,
+                                                               final long latestSessionStartTime) {
             return getInner().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
         }
 
@@ -346,7 +386,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         @Override
-        public void put(final Windowed<K> sessionKey, final AGG aggregate) {
+        public void put(final Windowed<K> sessionKey,
+                        final AGG aggregate) {
             throw new UnsupportedOperationException(ERROR_MESSAGE);
         }
 
@@ -356,7 +397,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         @Override
-        public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) {
+        public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
+                                                        final K to) {
             return getInner().fetch(from, to);
         }
     }
@@ -364,7 +406,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
     private abstract static class StateStoreReadWriteDecorator<T extends StateStore> extends AbstractStateStore {
         static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";
 
-        StateStoreReadWriteDecorator(final T inner) {
+        private StateStoreReadWriteDecorator(final T inner) {
             super(inner);
         }
 
@@ -374,7 +416,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         @Override
-        public void init(final ProcessorContext context, final StateStore root) {
+        public void init(final ProcessorContext context,
+                         final StateStore root) {
             throw new UnsupportedOperationException(ERROR_MESSAGE);
         }
 
@@ -384,8 +427,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
     }
 
-    private static class KeyValueStoreReadWriteDecorator<K, V> extends StateStoreReadWriteDecorator<KeyValueStore<K, V>> implements KeyValueStore<K, V> {
-        KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
+    private static class KeyValueStoreReadWriteDecorator<K, V>
+        extends StateStoreReadWriteDecorator<KeyValueStore<K, V>>
+        implements KeyValueStore<K, V> {
+
+        private KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
             super(inner);
         }
 
@@ -395,7 +441,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         @Override
-        public KeyValueIterator<K, V> range(final K from, final K to) {
+        public KeyValueIterator<K, V> range(final K from,
+                                            final K to) {
             return wrapped().range(from, to);
         }
 
@@ -410,12 +457,14 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         @Override
-        public void put(final K key, final V value) {
+        public void put(final K key,
+                        final V value) {
             wrapped().put(key, value);
         }
 
         @Override
-        public V putIfAbsent(final K key, final V value) {
+        public V putIfAbsent(final K key,
+                             final V value) {
             return wrapped().putIfAbsent(key, value);
         }
 
@@ -430,35 +479,47 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
     }
 
-    private static class WindowStoreReadWriteDecorator<K, V> extends StateStoreReadWriteDecorator<WindowStore<K, V>> implements WindowStore<K, V> {
-        WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
+    private static class WindowStoreReadWriteDecorator<K, V>
+        extends StateStoreReadWriteDecorator<WindowStore<K, V>>
+        implements WindowStore<K, V> {
+
+        private WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
             super(inner);
         }
 
         @Override
-        public void put(final K key, final V value) {
+        public void put(final K key,
+                        final V value) {
             wrapped().put(key, value);
         }
 
         @Override
-        public void put(final K key, final V value, final long windowStartTimestamp) {
+        public void put(final K key,
+                        final V value,
+                        final long windowStartTimestamp) {
             wrapped().put(key, value, windowStartTimestamp);
         }
 
         @Override
-        public V fetch(final K key, final long time) {
+        public V fetch(final K key,
+                       final long time) {
             return wrapped().fetch(key, time);
         }
 
         @Deprecated
         @Override
-        public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+        public WindowStoreIterator<V> fetch(final K key,
+                                            final long timeFrom,
+                                            final long timeTo) {
             return wrapped().fetch(key, timeFrom, timeTo);
         }
 
         @Deprecated
         @Override
-        public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
+        public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+                                                      final K to,
+                                                      final long timeFrom,
+                                                      final long timeTo) {
             return wrapped().fetch(from, to, timeFrom, timeTo);
         }
 
@@ -469,23 +530,32 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
         @Deprecated
         @Override
-        public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
+        public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+                                                         final long timeTo) {
             return wrapped().fetchAll(timeFrom, timeTo);
         }
     }
 
-    private static class SessionStoreReadWriteDecorator<K, AGG> extends StateStoreReadWriteDecorator<SessionStore<K, AGG>> implements SessionStore<K, AGG> {
-        SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
+    private static class SessionStoreReadWriteDecorator<K, AGG>
+        extends StateStoreReadWriteDecorator<SessionStore<K, AGG>>
+        implements SessionStore<K, AGG> {
+
+        private SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
             super(inner);
         }
 
         @Override
-        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
+                                                               final long earliestSessionEndTime,
+                                                               final long latestSessionStartTime) {
             return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
         }
 
         @Override
-        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
+        public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
+                                                               final K keyTo,
+                                                               final long earliestSessionEndTime,
+                                                               final long latestSessionStartTime) {
             return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
         }
 
@@ -505,7 +575,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         }
 
         @Override
-        public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) {
+        public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
+                                                        final K to) {
             return wrapped().fetch(from, to);
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index d08779f..cf830fb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -96,7 +96,8 @@ public class ProcessorStateManager extends AbstractStateManager {
     }
 
 
-    public static String storeChangelogTopic(final String applicationId, final String storeName) {
+    public static String storeChangelogTopic(final String applicationId,
+                                             final String storeName) {
         return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
     }
 
@@ -133,12 +134,13 @@ public class ProcessorStateManager extends AbstractStateManager {
             restoreCallbacks.put(topic, stateRestoreCallback);
         } else {
             log.trace("Restoring state store {} from changelog topic {}", storeName, topic);
-            final StateRestorer restorer = new StateRestorer(storePartition,
-                                                             new CompositeRestoreListener(stateRestoreCallback),
-                                                             checkpointableOffsets.get(storePartition),
-                                                             offsetLimit(storePartition),
-                                                             store.persistent(),
-                                                             storeName);
+            final StateRestorer restorer = new StateRestorer(
+                storePartition,
+                new CompositeRestoreListener(stateRestoreCallback),
+                checkpointableOffsets.get(storePartition),
+                offsetLimit(storePartition),
+                store.persistent(),
+                storeName);
 
             changelogReader.register(restorer);
         }
@@ -190,7 +192,8 @@ public class ProcessorStateManager extends AbstractStateManager {
         standbyRestoredOffsets.put(storePartition, lastOffset + 1);
     }
 
-    void putOffsetLimit(final TopicPartition partition, final long limit) {
+    void putOffsetLimit(final TopicPartition partition,
+                        final long limit) {
         log.trace("Updating store offset limit for partition {} to {}", partition, limit);
         offsetLimits.put(partition, limit);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index b4ad19f..03b9e2d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -22,7 +22,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
@@ -171,7 +171,7 @@ public class SubscriptionInfo {
         if (userEndPoint == null) {
             return new byte[0];
         } else {
-            return userEndPoint.getBytes(Charset.forName("UTF-8"));
+            return userEndPoint.getBytes(StandardCharsets.UTF_8);
         }
     }
 
@@ -318,7 +318,7 @@ public class SubscriptionInfo {
         if (bytesLength != 0) {
             final byte[] bytes = new byte[bytesLength];
             data.get(bytes);
-            subscriptionInfo.userEndPoint = new String(bytes, Charset.forName("UTF-8"));
+            subscriptionInfo.userEndPoint = new String(bytes, StandardCharsets.UTF_8);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index d33e324..297e181 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.internals.CompositeReadOnlySessionStore;
@@ -23,37 +25,40 @@ import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore;
 import org.apache.kafka.streams.state.internals.StateStoreProvider;
 
 /**
- * Provides access to the {@link QueryableStoreType}s provided with KafkaStreams. These
- * can be used with {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType)}
- * To access and query the {@link StateStore}s that are part of a Topology
+ * Provides access to the {@link QueryableStoreType}s provided with {@link KafkaStreams}.
+ * These can be used with {@link KafkaStreams#store(String, QueryableStoreType)}.
+ * To access and query the {@link StateStore}s that are part of a {@link Topology}.
  */
-public class QueryableStoreTypes {
+public final class QueryableStoreTypes {
 
     /**
-     * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore}
-     * @param <K>   key type of the store
-     * @param <V>   value type of the store
-     * @return  {@link QueryableStoreTypes.KeyValueStoreType}
+     * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore}.
+     *
+     * @param <K> key type of the store
+     * @param <V> value type of the store
+     * @return {@link QueryableStoreTypes.KeyValueStoreType}
      */
     public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueStore() {
         return new KeyValueStoreType<>();
     }
 
     /**
-     * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}
-     * @param <K>   key type of the store
-     * @param <V>   value type of the store
-     * @return  {@link QueryableStoreTypes.WindowStoreType}
+     * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}.
+     *
+     * @param <K> key type of the store
+     * @param <V> value type of the store
+     * @return {@link QueryableStoreTypes.WindowStoreType}
      */
     public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStore() {
         return new WindowStoreType<>();
     }
 
     /**
-     * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}
-     * @param <K>   key type of the store
-     * @param <V>   value type of the store
-     * @return  {@link QueryableStoreTypes.SessionStoreType}
+     * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}.
+     *
+     * @param <K> key type of the store
+     * @param <V> value type of the store
+     * @return {@link QueryableStoreTypes.SessionStoreType}
      */
     public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>> sessionStore() {
         return new SessionStoreType<>();
@@ -104,7 +109,8 @@ public class QueryableStoreTypes {
             super(ReadOnlySessionStore.class);
         }
         @Override
-        public ReadOnlySessionStore<K, V> create(final StateStoreProvider storeProvider, final String storeName) {
+        public ReadOnlySessionStore<K, V> create(final StateStoreProvider storeProvider,
+                                                 final String storeName) {
             return new CompositeReadOnlySessionStore<>(storeProvider, this, storeName);
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index fcbd004..2203f59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -43,16 +43,16 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      * @param key The key to associate the value to
      * @param value The value to update, it can be null;
      *              if the serialized bytes are also null it is interpreted as deletes
-     * @throws NullPointerException If null is used for key.
+     * @throws NullPointerException if the given key is {@code null}
      */
     void put(K key, V value);
 
     /**
-     * Put a key-value pair with the given timestamp into the corresponding window
+     * Put a key-value pair into the window with given window start timestamp
      * @param key The key to associate the value to
      * @param value The value; can be null
      * @param windowStartTimestamp The timestamp of the beginning of the window to put the key/value into
-     * @throws NullPointerException If null is used for key.
+     * @throws NullPointerException if the given key is {@code null}
      */
     void put(K key, V value, long windowStartTimestamp);
 
@@ -87,7 +87,7 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      * @param timeTo    time range end (inclusive)
      * @return an iterator over key-value pairs {@code <timestamp, value>}
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException If {@code null} is used for key.
+     * @throws NullPointerException if the given key is {@code null}
      */
     @SuppressWarnings("deprecation")
     WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
@@ -111,7 +111,7 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      * @param timeTo    time range end (inclusive)
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException If {@code null} is used for any key.
+     * @throws NullPointerException if one of the given keys is {@code null}
      */
     @SuppressWarnings("deprecation")
     KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
@@ -132,7 +132,6 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
      * @param timeTo   the end of the time slot from which to search (inclusive)
      * @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
      * @throws InvalidStateStoreException if the store is not initialized
-     * @throws NullPointerException if {@code null} is used for any key
      */
     @SuppressWarnings("deprecation")
     KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 8d9b207..a736ab6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -59,7 +59,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
     }
 
     @Override
-    public void init(final ProcessorContext context, final StateStore root) {
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
         initInternal(context);
         underlying.init(context, root);
         // save the stream thread as we only ever want to trigger a flush
@@ -76,17 +77,15 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
 
         this.cache = this.context.getCache();
         this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), underlying.name());
-        cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> entries) {
-                for (final ThreadCache.DirtyEntry entry : entries) {
-                    putAndMaybeForward(entry, (InternalProcessorContext) context);
-                }
+        cache.addDirtyEntryFlushListener(cacheName, entries -> {
+            for (final ThreadCache.DirtyEntry entry : entries) {
+                putAndMaybeForward(entry, (InternalProcessorContext) context);
             }
         });
     }
 
-    private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
+    private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
+                                    final InternalProcessorContext context) {
         final ProcessorRecordContext current = context.recordContext();
         try {
             context.setRecordContext(entry.entry().context());
@@ -190,7 +189,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
     }
 
     @Override
-    public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+                                                 final Bytes to) {
         validateStoreOpen();
         final KeyValueIterator<Bytes, byte[]> storeIterator = underlying.range(from, to);
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, from, to);
@@ -217,7 +217,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
     }
 
     @Override
-    public void put(final Bytes key, final byte[] value) {
+    public void put(final Bytes key,
+                    final byte[] value) {
         Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
         lock.writeLock().lock();
@@ -229,7 +230,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
         }
     }
 
-    private void putInternal(final Bytes key, final byte[] value) {
+    private void putInternal(final Bytes key,
+                             final byte[] value) {
         cache.put(
             cacheName,
             key,
@@ -244,7 +246,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
     }
 
     @Override
-    public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+    public byte[] putIfAbsent(final Bytes key,
+                              final byte[] value) {
         Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
         lock.writeLock().lock();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 1d8fb58..94c250c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -38,19 +38,17 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS
     }
 
     @Override
-    public void init(final ProcessorContext context, final StateStore root) {
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
         inner.init(context, root);
         final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), inner.name());
         this.changeLogger = new StoreChangeLogger<>(inner.name(), context, new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
 
         // if the inner store is an LRU cache, add the eviction listener to log removed record
         if (inner instanceof MemoryLRUCache) {
-            ((MemoryLRUCache<Bytes, byte[]>) inner).whenEldestRemoved(new MemoryLRUCache.EldestEntryRemovalListener<Bytes, byte[]>() {
-                @Override
-                public void apply(final Bytes key, final byte[] value) {
-                    // pass null to indicate removal
-                    changeLogger.logChange(key, null);
-                }
+            ((MemoryLRUCache<Bytes, byte[]>) inner).setWhenEldestRemoved((key, value) -> {
+                // pass null to indicate removal
+                changeLogger.logChange(key, null);
             });
         }
     }
@@ -61,13 +59,15 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS
     }
 
     @Override
-    public void put(final Bytes key, final byte[] value) {
+    public void put(final Bytes key,
+                    final byte[] value) {
         inner.put(key, value);
         changeLogger.logChange(key, value);
     }
 
     @Override
-    public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+    public byte[] putIfAbsent(final Bytes key,
+                              final byte[] value) {
         final byte[] previous = get(key);
         if (previous == null) {
             put(key, value);
@@ -96,7 +96,8 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS
     }
 
     @Override
-    public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+                                                 final Bytes to) {
         return inner.range(from, to);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
deleted file mode 100644
index 4dccd6e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
-
-import java.util.List;
-
-public class InMemoryKeyValueLoggedStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
-
-    private final KeyValueStore<K, V> inner;
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
-
-    private StoreChangeLogger<K, V> changeLogger;
-
-    public InMemoryKeyValueLoggedStore(final KeyValueStore<K, V> inner, final Serde<K> keySerde, final Serde<V> valueSerde) {
-        super(inner);
-        this.inner = inner;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void init(final ProcessorContext context, final StateStore root) {
-        inner.init(context, root);
-
-        // construct the serde
-        final StateSerdes<K, V>  serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), inner.name()),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-
-        this.changeLogger = new StoreChangeLogger<>(inner.name(), context, serdes);
-
-        // if the inner store is an LRU cache, add the eviction listener to log removed record
-        if (inner instanceof MemoryLRUCache) {
-            ((MemoryLRUCache<K, V>) inner).whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() {
-                @Override
-                public void apply(final K key, final V value) {
-                    removed(key);
-                }
-            });
-        }
-    }
-
-    @Override
-    public long approximateNumEntries() {
-        return inner.approximateNumEntries();
-    }
-
-    @Override
-    public V get(final K key) {
-        return this.inner.get(key);
-    }
-
-    @Override
-    public void put(final K key, final V value) {
-        this.inner.put(key, value);
-
-        changeLogger.logChange(key, value);
-    }
-
-    @Override
-    public V putIfAbsent(final K key, final V value) {
-        final V originalValue = this.inner.putIfAbsent(key, value);
-        if (originalValue == null) {
-            changeLogger.logChange(key, value);
-        }
-        return originalValue;
-    }
-
-    @Override
-    public void putAll(final List<KeyValue<K, V>> entries) {
-        this.inner.putAll(entries);
-
-        for (final KeyValue<K, V> entry : entries) {
-            final K key = entry.key;
-            changeLogger.logChange(key, entry.value);
-        }
-    }
-
-    @Override
-    public V delete(final K key) {
-        final V value = this.inner.delete(key);
-
-        removed(key);
-
-        return value;
-    }
-
-    /**
-     * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
-     * store.
-     *
-     * @param key the key for the entry that the inner store removed
-     */
-    protected void removed(final K key) {
-        changeLogger.logChange(key, null);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> range(final K from, final K to) {
-        return this.inner.range(from, to);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> all() {
-        return this.inner.all();
-    }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 5a6b618..d6dd42a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -52,10 +52,6 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
         this.map = new TreeMap<>();
     }
 
-    public KeyValueStore<K, V> enableLogging() {
-        return new InMemoryKeyValueLoggedStore<>(this, keySerde, valueSerde);
-    }
-
     @Override
     public String name() {
         return this.name;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
index c790ee9..31169d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+
 import java.util.Objects;
 
 public class KeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, KeyValueStore<K, V>> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 80078b7..f0c3c8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -43,18 +43,17 @@ import java.util.Objects;
 public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
 
     public interface EldestEntryRemovalListener<K, V> {
-
         void apply(K key, V value);
     }
-    private final Serde<K> keySerde;
 
+    private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private final String name;
     protected final Map<K, V> map;
 
     private StateSerdes<K, V> serdes;
-    private boolean restoring = false;      // TODO: this is a sub-optimal solution to avoid logging during restoration.
-                                            // in the future we should augment the StateRestoreCallback with onComplete etc to better resolve this.
+    private boolean restoring = false; // TODO: this is a sub-optimal solution to avoid logging during restoration.
+                                       // in the future we should augment the StateRestoreCallback with onComplete etc to better resolve this.
     private volatile boolean open = true;
 
     private EldestEntryRemovalListener<K, V> listener;
@@ -82,14 +81,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
         };
     }
 
-    KeyValueStore<K, V> enableLogging() {
-        return new InMemoryKeyValueLoggedStore<>(this, keySerde, valueSerde);
-    }
-
-    MemoryLRUCache<K, V> whenEldestRemoved(final EldestEntryRemovalListener<K, V> listener) {
+    void setWhenEldestRemoved(final EldestEntryRemovalListener<K, V> listener) {
         this.listener = listener;
-
-        return this;
     }
 
     @Override
@@ -99,7 +92,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
 
     @Override
     @SuppressWarnings("unchecked")
-    public void init(final ProcessorContext context, final StateStore root) {
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
         // construct the serde
         this.serdes = new StateSerdes<>(
             ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
@@ -137,7 +131,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public synchronized void put(final K key, final V value) {
+    public synchronized void put(final K key,
+                                 final V value) {
         Objects.requireNonNull(key);
         if (value == null) {
             this.map.remove(key);
@@ -147,7 +142,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public synchronized V putIfAbsent(final K key, final V value) {
+    public synchronized V putIfAbsent(final K key,
+                                      final V value) {
         Objects.requireNonNull(key);
         final V originalValue = get(key);
         if (originalValue == null) {
@@ -173,7 +169,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
      * @throws UnsupportedOperationException at every invocation
      */
     @Override
-    public KeyValueIterator<K, V> range(final K from, final K to) {
+    public KeyValueIterator<K, V> range(final K from,
+                                        final K to) {
         throw new UnsupportedOperationException("MemoryLRUCache does not support range() function.");
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 57458fb..f3d1cae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -82,14 +82,14 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.metrics = (StreamsMetricsImpl) context.metrics();
+        metrics = (StreamsMetricsImpl) context.metrics();
 
         taskName = context.taskId().toString();
         final String metricsGroup = "stream-" + metricScope + "-metrics";
         final Map<String, String> taskTags = metrics.tagMap("task-id", taskName, metricScope + "-id", "all");
         final Map<String, String> storeTags = metrics.tagMap("task-id", taskName, metricScope + "-id", name());
 
-        this.serdes = new StateSerdes<>(
+        serdes = new StateSerdes<>(
             ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
             keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
             valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
@@ -132,9 +132,9 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
     public V get(final K key) {
         try {
             if (getTime.shouldRecord()) {
-                return measureLatency(() -> outerValue(inner.get(Bytes.wrap(serdes.rawKey(key)))), getTime);
+                return measureLatency(() -> outerValue(inner.get(keyBytes(key))), getTime);
             } else {
-                return outerValue(inner.get(Bytes.wrap(serdes.rawKey(key))));
+                return outerValue(inner.get(keyBytes(key)));
             }
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), key);
@@ -148,11 +148,11 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
         try {
             if (putTime.shouldRecord()) {
                 measureLatency(() -> {
-                    inner.put(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value));
+                    inner.put(keyBytes(key), serdes.rawValue(value));
                     return null;
                 }, putTime);
             } else {
-                inner.put(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value));
+                inner.put(keyBytes(key), serdes.rawValue(value));
             }
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), key, value);
@@ -165,10 +165,10 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
                          final V value) {
         if (putIfAbsentTime.shouldRecord()) {
             return measureLatency(
-                () -> outerValue(inner.putIfAbsent(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value))),
+                () -> outerValue(inner.putIfAbsent(keyBytes(key), serdes.rawValue(value))),
                 putIfAbsentTime);
         } else {
-            return outerValue(inner.putIfAbsent(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value)));
+            return outerValue(inner.putIfAbsent(keyBytes(key), serdes.rawValue(value)));
         }
     }
 
@@ -190,9 +190,9 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
     public V delete(final K key) {
         try {
             if (deleteTime.shouldRecord()) {
-                return measureLatency(() -> outerValue(inner.delete(Bytes.wrap(serdes.rawKey(key)))), deleteTime);
+                return measureLatency(() -> outerValue(inner.delete(keyBytes(key))), deleteTime);
             } else {
-                return outerValue(inner.delete(Bytes.wrap(serdes.rawKey(key))));
+                return outerValue(inner.delete(keyBytes(key)));
             }
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), key);
@@ -204,13 +204,13 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
     public KeyValueIterator<K, V> range(final K from,
                                         final K to) {
         return new MeteredKeyValueIterator(
-            this.inner.range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to))),
-            this.rangeTime);
+            inner.range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to))),
+            rangeTime);
     }
 
     @Override
     public KeyValueIterator<K, V> all() {
-        return new MeteredKeyValueIterator(this.inner.all(), this.allTime);
+        return new MeteredKeyValueIterator(inner.all(), allTime);
     }
 
     @Override
@@ -245,6 +245,10 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
         return value == null ? null : serdes.valueFrom(value);
     }
 
+    private Bytes keyBytes(final K key) {
+        return Bytes.wrap(serdes.rawKey(key));
+    }
+
     private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, V>> from) {
         final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
         for (final KeyValue<K, V> entry : from) {
@@ -289,7 +293,7 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
             try {
                 iter.close();
             } finally {
-                metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
+                metrics.recordLatency(sensor, startNs, time.nanoseconds());
             }
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 65ab758..31a039b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -65,12 +65,14 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
 
     @SuppressWarnings("unchecked")
     @Override
-    public void init(final ProcessorContext context, final StateStore root) {
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
         //noinspection unchecked
-        this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
-                                        keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                                        valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-        this.metrics = (StreamsMetricsImpl) context.metrics();
+        serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        metrics = (StreamsMetricsImpl) context.metrics();
 
         taskName = context.taskId().toString();
         final String metricsGroup = "stream-" + metricScope + "-metrics";
@@ -88,7 +90,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
         try {
             inner.init(context, root);
         } finally {
-            this.metrics.recordLatency(
+            metrics.recordLatency(
                 restoreTime,
                 startNs,
                 time.nanoseconds()
@@ -109,13 +111,15 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
                                                          final long latestSessionStartTime) {
         Objects.requireNonNull(key, "key cannot be null");
         final Bytes bytesKey = keyBytes(key);
-        return new MeteredWindowedKeyValueIterator<>(inner.findSessions(bytesKey,
-                                                                        earliestSessionEndTime,
-                                                                        latestSessionStartTime),
-                                                     fetchTime,
-                                                     metrics,
-                                                     serdes,
-                                                     time);
+        return new MeteredWindowedKeyValueIterator<>(
+            inner.findSessions(
+                bytesKey,
+                earliestSessionEndTime,
+                latestSessionStartTime),
+            fetchTime,
+            metrics,
+            serdes,
+            time);
     }
 
     @Override
@@ -127,14 +131,16 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
         Objects.requireNonNull(keyTo, "keyTo cannot be null");
         final Bytes bytesKeyFrom = keyBytes(keyFrom);
         final Bytes bytesKeyTo = keyBytes(keyTo);
-        return new MeteredWindowedKeyValueIterator<>(inner.findSessions(bytesKeyFrom,
-                                                                        bytesKeyTo,
-                                                                        earliestSessionEndTime,
-                                                                        latestSessionStartTime),
-                                                     fetchTime,
-                                                     metrics,
-                                                     serdes,
-                                                     time);
+        return new MeteredWindowedKeyValueIterator<>(
+            inner.findSessions(
+                bytesKeyFrom,
+                bytesKeyTo,
+                earliestSessionEndTime,
+                latestSessionStartTime),
+            fetchTime,
+            metrics,
+            serdes,
+            time);
     }
 
     @Override
@@ -148,22 +154,23 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
             final String message = String.format(e.getMessage(), sessionKey.key());
             throw new ProcessorStateException(message, e);
         } finally {
-            this.metrics.recordLatency(removeTime, startNs, time.nanoseconds());
+            metrics.recordLatency(removeTime, startNs, time.nanoseconds());
         }
     }
 
     @Override
-    public void put(final Windowed<K> sessionKey, final V aggregate) {
+    public void put(final Windowed<K> sessionKey,
+                    final V aggregate) {
         Objects.requireNonNull(sessionKey, "sessionKey can't be null");
         final long startNs = time.nanoseconds();
         try {
             final Bytes key = keyBytes(sessionKey.key());
-            this.inner.put(new Windowed<>(key, sessionKey.window()), serdes.rawValue(aggregate));
+            inner.put(new Windowed<>(key, sessionKey.window()), serdes.rawValue(aggregate));
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), sessionKey.key(), aggregate);
             throw new ProcessorStateException(message, e);
         } finally {
-            this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
+            metrics.recordLatency(putTime, startNs, time.nanoseconds());
         }
     }
 
@@ -178,7 +185,8 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to) {
+    public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+                                                  final K to) {
         Objects.requireNonNull(from, "from cannot be null");
         Objects.requireNonNull(to, "to cannot be null");
         return findSessions(from, to, 0, Long.MAX_VALUE);
@@ -188,9 +196,9 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
     public void flush() {
         final long startNs = time.nanoseconds();
         try {
-            this.inner.flush();
+            inner.flush();
         } finally {
-            this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
+            metrics.recordLatency(flushTime, startNs, time.nanoseconds());
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index fefa772..166c300 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -66,12 +66,14 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
 
     @SuppressWarnings("unchecked")
     @Override
-    public void init(final ProcessorContext context, final StateStore root) {
+    public void init(final ProcessorContext context,
+                     final StateStore root) {
         this.context = context;
-        this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
-                                        keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-                                        valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-        this.metrics = (StreamsMetricsImpl) context.metrics();
+        serdes = new StateSerdes<>(
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
+        metrics = (StreamsMetricsImpl) context.metrics();
 
         taskName = context.taskId().toString();
         final String metricsGroup = "stream-" + metricScope + "-metrics";
@@ -88,7 +90,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
         try {
             inner.init(context, root);
         } finally {
-            this.metrics.recordLatency(
+            metrics.recordLatency(
                 restoreTime,
                 startNs,
                 time.nanoseconds()
@@ -103,12 +105,15 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public void put(final K key, final V value) {
+    public void put(final K key,
+                    final V value) {
         put(key, value, context.timestamp());
     }
 
     @Override
-    public void put(final K key, final V value, final long windowStartTimestamp) {
+    public void put(final K key,
+                    final V value,
+                    final long windowStartTimestamp) {
         final long startNs = time.nanoseconds();
         try {
             inner.put(keyBytes(key), serdes.rawValue(value), windowStartTimestamp);
@@ -116,7 +121,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
             final String message = String.format(e.getMessage(), key, value);
             throw new ProcessorStateException(message, e);
         } finally {
-            metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
+            metrics.recordLatency(putTime, startNs, time.nanoseconds());
         }
     }
 
@@ -125,7 +130,8 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
     }
 
     @Override
-    public V fetch(final K key, final long timestamp) {
+    public V fetch(final K key,
+                   final long timestamp) {
         final long startNs = time.nanoseconds();
         try {
             final byte[] result = inner.fetch(keyBytes(key), timestamp);
@@ -134,13 +140,15 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
             }
             return serdes.valueFrom(result);
         } finally {
-            metrics.recordLatency(this.fetchTime, startNs, time.nanoseconds());
+            metrics.recordLatency(fetchTime, startNs, time.nanoseconds());
         }
     }
 
     @SuppressWarnings("deprecation")
     @Override
-    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+    public WindowStoreIterator<V> fetch(final K key,
+                                        final long timeFrom,
+                                        final long timeTo) {
         return new MeteredWindowStoreIterator<>(inner.fetch(keyBytes(key), timeFrom, timeTo),
                                                 fetchTime,
                                                 metrics,
@@ -155,22 +163,28 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
 
     @SuppressWarnings("deprecation")
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
-        return new MeteredWindowedKeyValueIterator<>(inner.fetchAll(timeFrom, timeTo),
-                                                     fetchTime,
-                                                     metrics,
-                                                     serdes,
-                                                     time);
+    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+                                                     final long timeTo) {
+        return new MeteredWindowedKeyValueIterator<>(
+            inner.fetchAll(timeFrom, timeTo),
+            fetchTime,
+            metrics,
+            serdes,
+            time);
     }
 
     @SuppressWarnings("deprecation")
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
-        return new MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo),
-                                                     fetchTime,
-                                                     metrics,
-                                                     serdes,
-                                                     time);
+    public KeyValueIterator<Windowed<K>, V> fetch(final K from,
+                                                  final K to,
+                                                  final long timeFrom,
+                                                  final long timeTo) {
+        return new MeteredWindowedKeyValueIterator<>(
+            inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo),
+            fetchTime,
+            metrics,
+            serdes,
+            time);
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
index 4270d27..2bfecd2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java
@@ -21,9 +21,10 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
+import static java.util.Collections.singletonList;
+
 /**
  * A wrapper over all of the {@link StateStoreProvider}s in a Topology
  */
@@ -47,10 +48,11 @@ public class QueryableStoreProvider {
      * @param <T>                The expected type of the returned store
      * @return A composite object that wraps the store instances.
      */
-    public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
+    public <T> T getStore(final String storeName,
+                          final QueryableStoreType<T> queryableStoreType) {
         final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
         if (!globalStore.isEmpty()) {
-            return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName);
+            return queryableStoreType.create(new WrappingStoreProvider(singletonList(globalStoreProvider)), storeName);
         }
         final List<T> allStores = new ArrayList<>();
         for (final StateStoreProvider storeProvider : storeProviders) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index e19bb6d..35fb5bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -103,7 +103,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
         this(name, DB_FILE_DIR);
     }
 
-    RocksDBStore(final String name, final String parentDir) {
+    RocksDBStore(final String name,
+                 final String parentDir) {
         this.name = name;
         this.parentDir = parentDir;
     }
@@ -222,7 +223,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
     }
 
     void toggleDbForBulkLoading(final boolean prepareForBulkload) {
-
         if (prepareForBulkload) {
             // if the store is not empty, we need to compact to get around the num.levels check
             // for bulk loading
@@ -434,7 +434,10 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
         }
     }
 
-    private class RocksDbIterator extends AbstractIterator<KeyValue<Bytes, byte[]>> implements KeyValueIterator<Bytes, byte[]> {
+    private class RocksDbIterator
+        extends AbstractIterator<KeyValue<Bytes, byte[]>>
+        implements KeyValueIterator<Bytes, byte[]> {
+
         private final String storeName;
         private final RocksIterator iter;
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 98555ad..74134d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -39,11 +39,16 @@ class StoreChangeLogger<K, V> {
     private final ProcessorContext context;
     private final RecordCollector collector;
 
-    StoreChangeLogger(final String storeName, final ProcessorContext context, final StateSerdes<K, V> serialization) {
+    StoreChangeLogger(final String storeName,
+                      final ProcessorContext context,
+                      final StateSerdes<K, V> serialization) {
         this(storeName, context, context.taskId().partition, serialization);
     }
 
-    private StoreChangeLogger(final String storeName, final ProcessorContext context, final int partition, final StateSerdes<K, V> serialization) {
+    private StoreChangeLogger(final String storeName,
+                              final ProcessorContext context,
+                              final int partition,
+                              final StateSerdes<K, V> serialization) {
         this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
         this.context = context;
         this.partition = partition;
@@ -51,7 +56,8 @@ class StoreChangeLogger<K, V> {
         this.collector = ((RecordCollector.Supplier) context).recordCollector();
     }
 
-    void logChange(final K key, final V value) {
+    void logChange(final K key,
+                   final V value) {
         if (collector != null) {
             final Serializer<K> keySerializer = serialization.keySerializer();
             final Serializer<V> valueSerializer = serialization.valueSerializer();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
index 31d063a..058a249 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 
-
 public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {
 
     private final WindowBytesStoreSupplier storeSupplier;
@@ -37,22 +36,24 @@ public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowS
 
     @Override
     public WindowStore<K, V> build() {
-        return new MeteredWindowStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
-                                        storeSupplier.metricsScope(),
-                                        time,
-                                        keySerde,
-                                        valueSerde);
+        return new MeteredWindowStore<>(
+            maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
+            storeSupplier.metricsScope(),
+            time,
+            keySerde,
+            valueSerde);
     }
 
     private WindowStore<Bytes, byte[]> maybeWrapCaching(final WindowStore<Bytes, byte[]> inner) {
         if (!enableCaching) {
             return inner;
         }
-        return new CachingWindowStore<>(inner,
-                                        keySerde,
-                                        valueSerde,
-                                        storeSupplier.windowSize(),
-                                        storeSupplier.segmentIntervalMs());
+        return new CachingWindowStore<>(
+            inner,
+            keySerde,
+            valueSerde,
+            storeSupplier.windowSize(),
+            storeSupplier.segmentIntervalMs());
     }
 
     private WindowStore<Bytes, byte[]> maybeWrapLogging(final WindowStore<Bytes, byte[]> inner) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
index 6de39cc..d94e7cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
@@ -30,7 +30,7 @@ public class WrappingStoreProvider implements StateStoreProvider {
 
     private final List<StateStoreProvider> storeProviders;
 
-    public WrappingStoreProvider(final List<StateStoreProvider> storeProviders) {
+    WrappingStoreProvider(final List<StateStoreProvider> storeProviders) {
         this.storeProviders = storeProviders;
     }
 
@@ -42,11 +42,11 @@ public class WrappingStoreProvider implements StateStoreProvider {
      * @param <T>       The type of the Store, for example, {@link org.apache.kafka.streams.state.ReadOnlyKeyValueStore}
      * @return  a List of all the stores with the storeName and are accepted by {@link QueryableStoreType#accepts(StateStore)}
      */
-    public <T> List<T> stores(final String storeName, final QueryableStoreType<T> type) {
+    public <T> List<T> stores(final String storeName,
+                              final QueryableStoreType<T> type) {
         final List<T> allStores = new ArrayList<>();
         for (final StateStoreProvider provider : storeProviders) {
-            final List<T> stores =
-                provider.stores(storeName, type);
+            final List<T> stores = provider.stores(storeName, type);
             allStores.addAll(stores);
         }
         if (allStores.isEmpty()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index d431dbe..17d403f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -33,8 +33,8 @@ import java.util.TreeSet;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsCollectionContaining.hasItem;
-import static org.hamcrest.core.IsCollectionContaining.hasItems;
+import static org.hamcrest.core.IsIterableContaining.hasItem;
+import static org.hamcrest.core.IsIterableContaining.hasItems;
 import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertTrue;
 


Mime
View raw message