kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: code cleanup (#6057)
Date Wed, 09 Jan 2019 14:05:01 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 86de2df  MINOR: code cleanup (#6057)
86de2df is described below

commit 86de2dfd27f96dd3fea0635ad110c63bc639c721
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Wed Jan 9 15:04:52 2019 +0100

    MINOR: code cleanup (#6057)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../streams/integration/EosIntegrationTest.java    | 267 +++++++++------------
 .../GlobalKTableEOSIntegrationTest.java            | 126 ++++------
 .../integration/GlobalKTableIntegrationTest.java   | 102 +++-----
 .../integration/GlobalThreadShutDownOrderTest.java |  68 ++----
 .../KTableSourceTopicRestartIntegrationTest.java   |  31 +--
 .../PurgeRepartitionTopicIntegrationTest.java      |  71 +++---
 .../integration/QueryableStateIntegrationTest.java | 254 +++++++++++++-------
 7 files changed, 442 insertions(+), 477 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index bdfbb3b..505d454 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -40,7 +41,6 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -49,6 +49,7 @@ import org.junit.experimental.categories.Category;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -98,7 +99,7 @@ public class EosIntegrationTest {
     private int testNumber = 0;
 
     @Before
-    public void createTopics() throws InterruptedException {
+    public void createTopics() throws Exception {
         applicationId = "appId-" + ++testNumber;
         CLUSTER.deleteTopicsAndWait(
             SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC,
@@ -154,26 +155,24 @@ public class EosIntegrationTest {
         output.to(outputTopic);
 
         for (int i = 0; i < numberOfRestarts; ++i) {
-            final KafkaStreams streams = new KafkaStreams(
-                builder.build(),
-                StreamsTestUtils.getStreamsConfig(
-                    applicationId,
-                    CLUSTER.bootstrapServers(),
-                    Serdes.LongSerde.class.getName(),
-                    Serdes.LongSerde.class.getName(),
-                    new Properties() {
-                        {
-                            put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
-                            put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-                            put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-                            put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
-                            put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
-                            put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
-                            put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-                        }
-                    }));
+            final Properties config = StreamsTestUtils.getStreamsConfig(
+                applicationId,
+                CLUSTER.bootstrapServers(),
+                Serdes.LongSerde.class.getName(),
+                Serdes.LongSerde.class.getName(),
+                new Properties() {
+                    {
+                        put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+                        put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+                        put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+                        put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1);
+                        put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
+                        put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
+                        put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+                    }
+                });
 
-            try {
+            try (final KafkaStreams streams = new KafkaStreams(builder.build(), config)) {
                 streams.start();
 
                 final List<KeyValue<Long, Long>> inputData = prepareData(i * 100, i * 100 + 10L, 0L, 1L);
@@ -192,18 +191,15 @@ public class EosIntegrationTest {
                             CONSUMER_GROUP_ID,
                             LongDeserializer.class,
                             LongDeserializer.class,
-                            new Properties() {
-                                {
-                                    put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
-                                }
-                            }),
+                            Utils.mkProperties(Collections.singletonMap(
+                                ConsumerConfig.ISOLATION_LEVEL_CONFIG,
+                                IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))
+                            ),
                         outputTopic,
                         inputData.size()
                     );
 
                 checkResultPerKey(committedRecords, inputData);
-            } finally {
-                streams.close();
             }
         }
     }
@@ -220,13 +216,15 @@ public class EosIntegrationTest {
 
     }
 
-    private void addAllKeys(final Set<Long> allKeys, final List<KeyValue<Long, Long>> records) {
+    private void addAllKeys(final Set<Long> allKeys,
+                            final List<KeyValue<Long, Long>> records) {
         for (final KeyValue<Long, Long> record : records) {
             allKeys.add(record.key);
         }
     }
 
-    private List<KeyValue<Long, Long>> getAllRecordPerKey(final Long key, final List<KeyValue<Long, Long>> records) {
+    private List<KeyValue<Long, Long>> getAllRecordPerKey(final Long key,
+                                                          final List<KeyValue<Long, Long>> records) {
         final List<KeyValue<Long, Long>> recordsPerKey = new ArrayList<>(records.size());
 
         for (final KeyValue<Long, Long> record : records) {
@@ -243,24 +241,21 @@ public class EosIntegrationTest {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream(SINGLE_PARTITION_INPUT_TOPIC).to(SINGLE_PARTITION_OUTPUT_TOPIC);
 
-        final KafkaStreams streams = new KafkaStreams(
-            builder.build(),
-            StreamsTestUtils.getStreamsConfig(
-                applicationId,
-                CLUSTER.bootstrapServers(),
-                Serdes.LongSerde.class.getName(),
-                Serdes.LongSerde.class.getName(),
-                new Properties() {
-                    {
-                        put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
-                        put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-                        put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-                        put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
-                        put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-                    }
-                }));
-
-        try {
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "1000");
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        final Properties config = StreamsTestUtils.getStreamsConfig(
+            applicationId,
+            CLUSTER.bootstrapServers(),
+            Serdes.LongSerde.class.getName(),
+            Serdes.LongSerde.class.getName(),
+            properties);
+
+        try (final KafkaStreams streams = new KafkaStreams(builder.build(), config)) {
             streams.start();
 
             final List<KeyValue<Long, Long>> firstBurstOfData = prepareData(0L, 5L, 0L);
@@ -315,8 +310,6 @@ public class EosIntegrationTest {
                 );
 
             assertThat(secondCommittedRecords, equalTo(secondBurstOfData));
-        } finally {
-            streams.close();
         }
     }
 
@@ -330,8 +323,7 @@ public class EosIntegrationTest {
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records (even if 50 record got written)
 
-        final KafkaStreams streams = getKafkaStreams(false, "appDir", 2);
-        try {
+        try (final KafkaStreams streams = getKafkaStreams(false, "appDir", 2)) {
             streams.start();
 
             final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
@@ -345,12 +337,9 @@ public class EosIntegrationTest {
 
             writeInputData(committedDataBeforeFailure);
 
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return commitRequested.get() == 2;
-                }
-            }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit.");
+            TestUtils.waitForCondition(
+                () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
+                "SteamsTasks did not request commit.");
 
             writeInputData(uncommittedDataBeforeFailure);
 
@@ -363,12 +352,9 @@ public class EosIntegrationTest {
             errorInjected.set(true);
             writeInputData(dataAfterFailure);
 
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return uncaughtException != null;
-                }
-            }, MAX_WAIT_TIME_MS, "Should receive uncaught exception from one StreamThread.");
+            TestUtils.waitForCondition(
+                () -> uncaughtException != null, MAX_WAIT_TIME_MS,
+                "Should receive uncaught exception from one StreamThread.");
 
             final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
                 committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
@@ -389,8 +375,6 @@ public class EosIntegrationTest {
 
             checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery);
             checkResultPerKey(committedRecordsAfterFailure, expectedCommittedRecordsAfterRecovery);
-        } finally {
-            streams.close();
         }
     }
 
@@ -407,8 +391,7 @@ public class EosIntegrationTest {
         // after fail over, we should read 40 committed records and the state stores should contain the correct sums
         // per key (even if some records got processed twice)
 
-        final KafkaStreams streams = getKafkaStreams(true, "appDir", 2);
-        try {
+        try (final KafkaStreams streams = getKafkaStreams(true, "appDir", 2)) {
             streams.start();
 
             final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L);
@@ -422,12 +405,9 @@ public class EosIntegrationTest {
 
             writeInputData(committedDataBeforeFailure);
 
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return commitRequested.get() == 2;
-                }
-            }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit.");
+            TestUtils.waitForCondition(
+                () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
+                "SteamsTasks did not request commit.");
 
             writeInputData(uncommittedDataBeforeFailure);
 
@@ -442,12 +422,9 @@ public class EosIntegrationTest {
             errorInjected.set(true);
             writeInputData(dataAfterFailure);
 
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return uncaughtException != null;
-                }
-            }, MAX_WAIT_TIME_MS, "Should receive uncaught exception from one StreamThread.");
+            TestUtils.waitForCondition(
+                () -> uncaughtException != null, MAX_WAIT_TIME_MS,
+                "Should receive uncaught exception from one StreamThread.");
 
             final List<KeyValue<Long, Long>> allCommittedRecords = readResult(
                 committedDataBeforeFailure.size() + uncommittedDataBeforeFailure.size() + dataAfterFailure.size(),
@@ -465,11 +442,11 @@ public class EosIntegrationTest {
             final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(allExpectedCommittedRecordsAfterRecovery);
 
             checkResultPerKey(allCommittedRecords, expectedResult);
-            checkResultPerKey(committedRecordsAfterFailure, expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()));
+            checkResultPerKey(
+                committedRecordsAfterFailure,
+                expectedResult.subList(committedDataBeforeFailure.size(), expectedResult.size()));
 
             verifyStateStore(streams, getMaxPerKey(expectedResult));
-        } finally {
-            streams.close();
         }
     }
 
@@ -486,9 +463,10 @@ public class EosIntegrationTest {
         // afterwards, the "stalling" thread resumes, and another rebalance should get triggered
         // we write the remaining 20 records and verify to read 60 result records
 
-        final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1);
-        final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1);
-        try {
+        try (
+            final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1);
+            final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1)
+        ) {
             streams1.start();
             streams2.start();
 
@@ -505,12 +483,9 @@ public class EosIntegrationTest {
 
             writeInputData(committedDataBeforeGC);
 
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return commitRequested.get() == 2;
-                }
-            }, MAX_WAIT_TIME_MS, "SteamsTasks did not request commit.");
+            TestUtils.waitForCondition(
+                () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
+                "SteamsTasks did not request commit.");
 
             writeInputData(uncommittedDataBeforeGC);
 
@@ -523,14 +498,12 @@ public class EosIntegrationTest {
             gcInjected.set(true);
             writeInputData(dataToTriggerFirstRebalance);
 
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return streams1.allMetadata().size() == 1 && streams2.allMetadata().size() == 1 &&
-                        (streams1.allMetadata().iterator().next().topicPartitions().size() == 2
-                        || streams2.allMetadata().iterator().next().topicPartitions().size() == 2);
-                }
-            }, MAX_WAIT_TIME_MS, "Should have rebalanced.");
+            TestUtils.waitForCondition(
+                () -> streams1.allMetadata().size() == 1
+                    && streams2.allMetadata().size() == 1
+                    && (streams1.allMetadata().iterator().next().topicPartitions().size() == 2
+                        || streams2.allMetadata().iterator().next().topicPartitions().size() == 2),
+                MAX_WAIT_TIME_MS, "Should have rebalanced.");
 
             final List<KeyValue<Long, Long>> committedRecordsAfterRebalance = readResult(
                 uncommittedDataBeforeGC.size() + dataToTriggerFirstRebalance.size(),
@@ -543,14 +516,13 @@ public class EosIntegrationTest {
             checkResultPerKey(committedRecordsAfterRebalance, expectedCommittedRecordsAfterRebalance);
 
             doGC = false;
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return streams1.allMetadata().size() == 1 && streams2.allMetadata().size() == 1
-                        && streams1.allMetadata().iterator().next().topicPartitions().size() == 1
-                        && streams2.allMetadata().iterator().next().topicPartitions().size() == 1;
-                }
-            }, MAX_WAIT_TIME_MS, "Should have rebalanced.");
+            TestUtils.waitForCondition(
+                () -> streams1.allMetadata().size() == 1
+                    && streams2.allMetadata().size() == 1
+                    && streams1.allMetadata().iterator().next().topicPartitions().size() == 1
+                    && streams2.allMetadata().iterator().next().topicPartitions().size() == 1,
+                MAX_WAIT_TIME_MS,
+                "Should have rebalanced.");
 
             writeInputData(dataAfterSecondRebalance);
 
@@ -566,13 +538,12 @@ public class EosIntegrationTest {
             allExpectedCommittedRecordsAfterRecovery.addAll(dataAfterSecondRebalance);
 
             checkResultPerKey(allCommittedRecords, allExpectedCommittedRecordsAfterRecovery);
-        } finally {
-            streams1.close();
-            streams2.close();
         }
     }
 
-    private List<KeyValue<Long, Long>> prepareData(final long fromInclusive, final long toExclusive, final Long... keys) {
+    private List<KeyValue<Long, Long>> prepareData(final long fromInclusive,
+                                                   final long toExclusive,
+                                                   final Long... keys) {
         final List<KeyValue<Long, Long>> data = new ArrayList<>();
 
         for (final Long k : keys) {
@@ -584,7 +555,9 @@ public class EosIntegrationTest {
         return data;
     }
 
-    private KafkaStreams getKafkaStreams(final boolean withState, final String appDir, final int numberOfStreamsThreads) {
+    private KafkaStreams getKafkaStreams(final boolean withState,
+                                         final String appDir,
+                                         final int numberOfStreamsThreads) {
         commitRequested = new AtomicInteger(0);
         errorInjected = new AtomicBoolean(false);
         gcInjected = new AtomicBoolean(false);
@@ -619,7 +592,8 @@ public class EosIntegrationTest {
                     }
 
                     @Override
-                    public KeyValue<Long, Long> transform(final Long key, final Long value) {
+                    public KeyValue<Long, Long> transform(final Long key,
+                                                          final Long value) {
                         if (gcInjected.compareAndSet(true, false)) {
                             while (doGC) {
                                 try {
@@ -666,38 +640,34 @@ public class EosIntegrationTest {
             } }, storeNames)
             .to(SINGLE_PARTITION_OUTPUT_TOPIC);
 
-        final KafkaStreams streams = new KafkaStreams(
-            builder.build(),
-            StreamsTestUtils.getStreamsConfig(
-                applicationId,
-                CLUSTER.bootstrapServers(),
-                Serdes.LongSerde.class.getName(),
-                Serdes.LongSerde.class.getName(),
-                new Properties() {
-                    {
-                        put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
-                        put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfStreamsThreads);
-                        put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE);
-                        put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
-                        put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
-                        put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);
-                        put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1);
-                        put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
-                        put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-                        put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
-                        put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:2142");
-                    }
-                }));
-
-        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            @Override
-            public void uncaughtException(final Thread t, final Throwable e) {
-                if (uncaughtException != null) {
-                    e.printStackTrace(System.err);
-                    fail("Should only get one uncaught exception from Streams.");
-                }
-                uncaughtException = e;
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
+        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfStreamsThreads);
+        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.MAX_VALUE);
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), "1000");
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), 5 * 1000);
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1);
+        properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
+        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+        properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
+        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:2142");
+
+        final Properties config = StreamsTestUtils.getStreamsConfig(
+            applicationId,
+            CLUSTER.bootstrapServers(),
+            Serdes.LongSerde.class.getName(),
+            Serdes.LongSerde.class.getName(),
+            properties);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+
+        streams.setUncaughtExceptionHandler((t, e) -> {
+            if (uncaughtException != null) {
+                e.printStackTrace(System.err);
+                fail("Should only get one uncaught exception from Streams.");
             }
+            uncaughtException = e;
         });
 
         return streams;
@@ -713,7 +683,7 @@ public class EosIntegrationTest {
     }
 
     private List<KeyValue<Long, Long>> readResult(final int numberOfRecords,
-                                                  final String groupId) throws InterruptedException {
+                                                  final String groupId) throws Exception {
         if (groupId != null) {
             return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
                 TestUtils.consumerConfig(
@@ -778,13 +748,14 @@ public class EosIntegrationTest {
         return expectedResult;
     }
 
-    private void verifyStateStore(final KafkaStreams streams, final Set<KeyValue<Long, Long>> expectedStoreContent) {
+    private void verifyStateStore(final KafkaStreams streams,
+                                  final Set<KeyValue<Long, Long>> expectedStoreContent) {
         ReadOnlyKeyValueStore<Long, Long> store = null;
 
         final long maxWaitingTime = System.currentTimeMillis() + 300000L;
         while (System.currentTimeMillis() < maxWaitingTime) {
             try {
-                store = streams.store(storeName, QueryableStoreTypes.<Long, Long>keyValueStore());
+                store = streams.store(storeName, QueryableStoreTypes.keyValueStore());
                 break;
             } catch (final InvalidStateStoreException okJustRetry) {
                 try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 084519e..787cb29 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -31,6 +30,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
@@ -41,7 +41,6 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -49,7 +48,6 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -72,18 +70,8 @@ public class GlobalKTableEOSIntegrationTest {
 
     private static volatile int testNo = 0;
     private final MockTime mockTime = CLUSTER.time;
-    private final KeyValueMapper<String, Long, Long> keyMapper = new KeyValueMapper<String, Long, Long>() {
-        @Override
-        public Long apply(final String key, final Long value) {
-            return value;
-        }
-    };
-    private final ValueJoiner<Long, String, String> joiner = new ValueJoiner<Long, String, String>() {
-        @Override
-        public String apply(final Long value1, final String value2) {
-            return value1 + "+" + value2;
-        }
-    };
+    private final KeyValueMapper<String, Long, Long> keyMapper = (key, value) -> value;
+    private final ValueJoiner<Long, String, String> joiner = (value1, value2) -> value1 + "+" + value2;
     private final String globalStore = "globalStore";
     private final Map<String, String> results = new HashMap<>();
     private StreamsBuilder builder;
@@ -96,7 +84,7 @@ public class GlobalKTableEOSIntegrationTest {
     private ForeachAction<String, String> foreachAction;
 
     @Before
-    public void before() throws InterruptedException {
+    public void before() throws Exception {
         testNo++;
         builder = new StreamsBuilder();
         createTopics();
@@ -116,16 +104,11 @@ public class GlobalKTableEOSIntegrationTest {
                                                   .withValueSerde(Serdes.String()));
         final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
         stream = builder.stream(streamTopic, stringLongConsumed);
-        foreachAction = new ForeachAction<String, String>() {
-            @Override
-            public void apply(final String key, final String value) {
-                results.put(key, value);
-            }
-        };
+        foreachAction = results::put;
     }
 
     @After
-    public void whenShuttingDown() throws IOException {
+    public void whenShuttingDown() throws Exception {
         if (kafkaStreams != null) {
             kafkaStreams.close();
         }
@@ -147,24 +130,22 @@ public class GlobalKTableEOSIntegrationTest {
         expected.put("d", "4+D");
         expected.put("e", "5+null");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return results.equals(expected);
-            }
-        }, 30000L, "waiting for initial values");
+        TestUtils.waitForCondition(
+            () -> results.equals(expected),
+            30000L,
+            "waiting for initial values");
 
 
         produceGlobalTableValues();
 
-        final ReadOnlyKeyValueStore<Long, String> replicatedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+        final ReadOnlyKeyValueStore<Long, String> replicatedStore =
+            kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
+
+        TestUtils.waitForCondition(
+            () -> "J".equals(replicatedStore.get(5L)),
+            30000,
+            "waiting for data in replicated store");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return "J".equals(replicatedStore.get(5L));
-            }
-        }, 30000, "waiting for data in replicated store");
         produceTopicValues(streamTopic);
 
         expected.put("a", "1+F");
@@ -173,12 +154,10 @@ public class GlobalKTableEOSIntegrationTest {
         expected.put("d", "4+I");
         expected.put("e", "5+J");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return results.equals(expected);
-            }
-        }, 30000L, "waiting for final values");
+        TestUtils.waitForCondition(
+            () -> results.equals(expected),
+            30000L,
+            "waiting for final values");
     }
 
     @Test
@@ -195,24 +174,21 @@ public class GlobalKTableEOSIntegrationTest {
         expected.put("c", "3+C");
         expected.put("d", "4+D");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return results.equals(expected);
-            }
-        }, 30000L, "waiting for initial values");
+        TestUtils.waitForCondition(
+            () -> results.equals(expected),
+            30000L,
+            "waiting for initial values");
 
 
         produceGlobalTableValues();
 
-        final ReadOnlyKeyValueStore<Long, String> replicatedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+        final ReadOnlyKeyValueStore<Long, String> replicatedStore =
+            kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return "J".equals(replicatedStore.get(5L));
-            }
-        }, 30000, "waiting for data in replicated store");
+        TestUtils.waitForCondition(
+            () -> "J".equals(replicatedStore.get(5L)),
+            30000,
+            "waiting for data in replicated store");
 
         produceTopicValues(streamTopic);
 
@@ -222,12 +198,10 @@ public class GlobalKTableEOSIntegrationTest {
         expected.put("d", "4+I");
         expected.put("e", "5+J");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return results.equals(expected);
-            }
-        }, 30000L, "waiting for final values");
+        TestUtils.waitForCondition(
+            () -> results.equals(expected),
+            30000L,
+            "waiting for final values");
     }
 
     @Test
@@ -242,12 +216,11 @@ public class GlobalKTableEOSIntegrationTest {
         expected.put(3L, "C");
         expected.put(4L, "D");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                ReadOnlyKeyValueStore<Long, String> store = null;
+        TestUtils.waitForCondition(
+            () -> {
+                final ReadOnlyKeyValueStore<Long, String> store;
                 try {
-                    store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+                    store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
                 } catch (final InvalidStateStoreException ex) {
                     return false;
                 }
@@ -258,8 +231,9 @@ public class GlobalKTableEOSIntegrationTest {
                     result.put(kv.key, kv.value);
                 }
                 return result.equals(expected);
-            }
-        }, 30000L, "waiting for initial values");
+            },
+            30000L,
+            "waiting for initial values");
     }
     
     @Test
@@ -276,12 +250,11 @@ public class GlobalKTableEOSIntegrationTest {
         expected.put(3L, "C");
         expected.put(4L, "D");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                ReadOnlyKeyValueStore<Long, String> store = null;
+        TestUtils.waitForCondition(
+            () -> {
+                final ReadOnlyKeyValueStore<Long, String> store;
                 try {
-                    store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+                    store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
                 } catch (final InvalidStateStoreException ex) {
                     return false;
                 }
@@ -292,11 +265,12 @@ public class GlobalKTableEOSIntegrationTest {
                     result.put(kv.key, kv.value);
                 }
                 return result.equals(expected);
-            }
-        }, 30000L, "waiting for initial values");
+            },
+            30000L,
+            "waiting for initial values");
     }
 
-    private void createTopics() throws InterruptedException {
+    private void createTopics() throws Exception {
         streamTopic = "stream-" + testNo;
         globalTableTopic = "globalTable-" + testNo;
         CLUSTER.createTopics(streamTopic);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 013e2b6..2190d70 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -40,7 +40,6 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -48,7 +47,6 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -62,23 +60,12 @@ public class GlobalKTableIntegrationTest {
     private static final int NUM_BROKERS = 1;
 
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER =
-            new EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
     private static volatile int testNo = 0;
     private final MockTime mockTime = CLUSTER.time;
-    private final KeyValueMapper<String, Long, Long> keyMapper = new KeyValueMapper<String, Long, Long>() {
-        @Override
-        public Long apply(final String key, final Long value) {
-            return value;
-        }
-    };
-    private final ValueJoiner<Long, String, String> joiner = new ValueJoiner<Long, String, String>() {
-        @Override
-        public String apply(final Long value1, final String value2) {
-            return value1 + "+" + value2;
-        }
-    };
+    private final KeyValueMapper<String, Long, Long> keyMapper = (key, value) -> value;
+    private final ValueJoiner<Long, String, String> joiner = (value1, value2) -> value1 + "+" + value2;
     private final String globalStore = "globalStore";
     private final Map<String, String> results = new HashMap<>();
     private StreamsBuilder builder;
@@ -91,15 +78,14 @@ public class GlobalKTableIntegrationTest {
     private ForeachAction<String, String> foreachAction;
 
     @Before
-    public void before() throws InterruptedException {
+    public void before() throws Exception {
         testNo++;
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
         final String applicationId = "globalTableTopic-table-test-" + testNo;
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        streamsConfiguration
-                .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
@@ -111,16 +97,11 @@ public class GlobalKTableIntegrationTest {
                                                   .withValueSerde(Serdes.String()));
         final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
         stream = builder.stream(streamTopic, stringLongConsumed);
-        foreachAction = new ForeachAction<String, String>() {
-            @Override
-            public void apply(final String key, final String value) {
-                results.put(key, value);
-            }
-        };
+        foreachAction = results::put;
     }
 
     @After
-    public void whenShuttingDown() throws IOException {
+    public void whenShuttingDown() throws Exception {
         if (kafkaStreams != null) {
             kafkaStreams.close();
         }
@@ -142,24 +123,22 @@ public class GlobalKTableIntegrationTest {
         expected.put("d", "4+D");
         expected.put("e", "5+null");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return results.equals(expected);
-            }
-        }, 30000L, "waiting for initial values");
+        TestUtils.waitForCondition(
+            () -> results.equals(expected),
+            30000L,
+            "waiting for initial values");
 
 
         produceGlobalTableValues();
 
-        final ReadOnlyKeyValueStore<Long, String> replicatedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+        final ReadOnlyKeyValueStore<Long, String> replicatedStore =
+            kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
+
+        TestUtils.waitForCondition(
+            () -> "J".equals(replicatedStore.get(5L)),
+            30000,
+            "waiting for data in replicated store");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return "J".equals(replicatedStore.get(5L));
-            }
-        }, 30000, "waiting for data in replicated store");
         produceTopicValues(streamTopic);
 
         expected.put("a", "1+F");
@@ -168,12 +147,10 @@ public class GlobalKTableIntegrationTest {
         expected.put("d", "4+I");
         expected.put("e", "5+J");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return results.equals(expected);
-            }
-        }, 30000L, "waiting for final values");
+        TestUtils.waitForCondition(
+            () -> results.equals(expected),
+            30000L,
+            "waiting for final values");
     }
 
     @Test
@@ -190,24 +167,21 @@ public class GlobalKTableIntegrationTest {
         expected.put("c", "3+C");
         expected.put("d", "4+D");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return results.equals(expected);
-            }
-        }, 30000L, "waiting for initial values");
+        TestUtils.waitForCondition(
+            () -> results.equals(expected),
+            30000L,
+            "waiting for initial values");
 
 
         produceGlobalTableValues();
 
-        final ReadOnlyKeyValueStore<Long, String> replicatedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+        final ReadOnlyKeyValueStore<Long, String> replicatedStore =
+            kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return "J".equals(replicatedStore.get(5L));
-            }
-        }, 30000, "waiting for data in replicated store");
+        TestUtils.waitForCondition(
+            () -> "J".equals(replicatedStore.get(5L)),
+            30000,
+            "waiting for data in replicated store");
 
         produceTopicValues(streamTopic);
 
@@ -217,12 +191,10 @@ public class GlobalKTableIntegrationTest {
         expected.put("d", "4+I");
         expected.put("e", "5+J");
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return results.equals(expected);
-            }
-        }, 30000L, "waiting for final values");
+        TestUtils.waitForCondition(
+            () -> results.equals(expected),
+            30000L,
+            "waiting for final values");
     }
 
     @Test
@@ -245,7 +217,7 @@ public class GlobalKTableIntegrationTest {
         assertThat(store.approximateNumEntries(), equalTo(4L));
     }
 
-    private void createTopics() throws InterruptedException {
+    private void createTopics() throws Exception {
         streamTopic = "stream-" + testNo;
         globalTableTopic = "globalTable-" + testNo;
         CLUSTER.createTopics(streamTopic);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index e0bec90..7c16927 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -17,30 +17,26 @@
 
 package org.apache.kafka.streams.integration;
 
-import java.time.Duration;
+import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -48,14 +44,12 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
-import kafka.utils.MockTime;
-
 import static org.junit.Assert.assertEquals;
 
 @Category({IntegrationTest.class})
@@ -71,8 +65,7 @@ public class GlobalThreadShutDownOrderTest {
     }
 
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER =
-        new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
 
     private final MockTime mockTime = CLUSTER.time;
     private final String globalStore = "globalStore";
@@ -81,20 +74,17 @@ public class GlobalThreadShutDownOrderTest {
     private KafkaStreams kafkaStreams;
     private String globalStoreTopic;
     private String streamTopic;
-    private KStream<String, Long> stream;
-    private List<Long> retrievedValuesList = new ArrayList<>();
+    private final List<Long> retrievedValuesList = new ArrayList<>();
     private boolean firstRecordProcessed;
 
     @Before
-    public void before() throws InterruptedException {
-
+    public void before() throws Exception {
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
         final String applicationId = "global-thread-shutdown-test";
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        streamsConfiguration
-            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
@@ -103,29 +93,26 @@ public class GlobalThreadShutDownOrderTest {
 
         final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
 
-        final KeyValueStoreBuilder<String, Long> storeBuilder = new KeyValueStoreBuilder<>(Stores.persistentKeyValueStore(globalStore),
-                                                                                           Serdes.String(),
-                                                                                           Serdes.Long(),
-                                                                                           mockTime);
-
-        builder.addGlobalStore(storeBuilder,
-                               globalStoreTopic,
-                               Consumed.with(Serdes.String(), Serdes.Long()),
-                               new MockProcessorSupplier());
+        final KeyValueStoreBuilder<String, Long> storeBuilder = new KeyValueStoreBuilder<>(
+            Stores.persistentKeyValueStore(globalStore),
+            Serdes.String(),
+            Serdes.Long(),
+            mockTime);
 
-        stream = builder.stream(streamTopic, stringLongConsumed);
+        builder.addGlobalStore(
+            storeBuilder,
+            globalStoreTopic,
+            Consumed.with(Serdes.String(), Serdes.Long()),
+            new MockProcessorSupplier());
 
-        stream.process(new ProcessorSupplier<String, Long>() {
-            @Override
-            public Processor<String, Long> get() {
-                return new GlobalStoreProcessor(globalStore);
-            }
-        });
+        builder
+            .stream(streamTopic, stringLongConsumed)
+            .process(() -> new GlobalStoreProcessor(globalStore));
 
     }
 
     @After
-    public void whenShuttingDown() throws IOException {
+    public void whenShuttingDown() throws Exception {
         if (kafkaStreams != null) {
             kafkaStreams.close();
         }
@@ -141,12 +128,10 @@ public class GlobalThreadShutDownOrderTest {
 
         kafkaStreams.start();
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return firstRecordProcessed;
-            }
-        }, 30000, "Has not processed record within 30 seconds");
+        TestUtils.waitForCondition(
+            () -> firstRecordProcessed,
+            30000,
+            "Has not processed record within 30 seconds");
 
         kafkaStreams.close(Duration.ofSeconds(30));
 
@@ -155,7 +140,7 @@ public class GlobalThreadShutDownOrderTest {
     }
 
 
-    private void createTopics() throws InterruptedException {
+    private void createTopics() throws Exception {
         streamTopic = "stream-topic";
         globalStoreTopic = "global-store-topic";
         CLUSTER.createTopics(streamTopic);
@@ -186,7 +171,6 @@ public class GlobalThreadShutDownOrderTest {
         private final String storeName;
 
         GlobalStoreProcessor(final String storeName) {
-
             this.storeName = storeName;
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 1707679..32d77c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.streams.integration;
 
 
-import java.time.Duration;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serdes;
@@ -30,13 +29,11 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
 import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -45,7 +42,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -95,19 +92,14 @@ public class KTableSourceTopicRestartIntegrationTest {
     @Before
     public void before() {
         final KTable<String, String> kTable = streamsBuilder.table(SOURCE_TOPIC, Materialized.as("store"));
-        kTable.toStream().foreach(new ForeachAction<String, String>() {
-            @Override
-            public void apply(final String key, final String value) {
-                readKeyValues.put(key, value);
-            }
-        });
+        kTable.toStream().foreach(readKeyValues::put);
 
         expectedInitialResultsMap = createExpectedResultsMap("a", "b", "c");
         expectedResultsWithDataWrittenDuringRestoreMap = createExpectedResultsMap("a", "b", "c", "d", "f", "g", "h");
     }
 
     @After
-    public void after() throws IOException {
+    public void after() throws Exception {
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
     }
 
@@ -129,7 +121,10 @@ public class KTableSourceTopicRestartIntegrationTest {
 
             produceKeyValues("f", "g", "h");
 
-            assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
+            assertNumberValuesRead(
+                readKeyValues,
+                expectedResultsWithDataWrittenDuringRestoreMap,
+                "Table did not get all values after restart");
         } finally {
             streamsOne.close(Duration.ofSeconds(5));
         }
@@ -154,7 +149,10 @@ public class KTableSourceTopicRestartIntegrationTest {
 
             produceKeyValues("f", "g", "h");
 
-            assertNumberValuesRead(readKeyValues, expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
+            assertNumberValuesRead(
+                readKeyValues,
+                expectedResultsWithDataWrittenDuringRestoreMap,
+                "Table did not get all values after restart");
         } finally {
             streamsOne.close(Duration.ofSeconds(5));
         }
@@ -188,12 +186,7 @@ public class KTableSourceTopicRestartIntegrationTest {
                                         final Map<String, String> expectedMap,
                                         final String errorMessage) throws InterruptedException {
         TestUtils.waitForCondition(
-            new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return valueMap.equals(expectedMap);
-                }
-            },
+            () -> valueMap.equals(expectedMap),
             30 * 1000L,
             errorMessage);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 96d7d14..4c7859b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.integration;
 
-import java.time.Duration;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -44,14 +43,13 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 
 @Category({IntegrationTest.class})
 public class PurgeRepartitionTopicIntegrationTest {
@@ -64,18 +62,18 @@ public class PurgeRepartitionTopicIntegrationTest {
 
     private static AdminClient adminClient;
     private static KafkaStreams kafkaStreams;
-    private static Integer purgeIntervalMs = 10;
-    private static Integer purgeSegmentBytes = 2000;
+    private static final Integer PURGE_INTERVAL_MS = 10;
+    private static final Integer PURGE_SEGMENT_BYTES = 2000;
 
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, new Properties() {
         {
-            put("log.retention.check.interval.ms", purgeIntervalMs);
+            put("log.retention.check.interval.ms", PURGE_INTERVAL_MS);
             put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, 0);
         }
     });
 
-    private Time time = CLUSTER.time;
+    private final Time time = CLUSTER.time;
 
     private class RepartitionTopicCreatedWithExpectedConfigs implements TestCondition {
         @Override
@@ -92,11 +90,14 @@ public class PurgeRepartitionTopicIntegrationTest {
 
             try {
                 final ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, REPARTITION_TOPIC);
-                final Config config = adminClient.describeConfigs(Collections.singleton(resource))
-                        .values().get(resource).get();
+                final Config config = adminClient
+                    .describeConfigs(Collections.singleton(resource))
+                    .values()
+                    .get(resource)
+                    .get();
                 return config.get(TopicConfig.CLEANUP_POLICY_CONFIG).value().equals(TopicConfig.CLEANUP_POLICY_DELETE)
-                        && config.get(TopicConfig.SEGMENT_MS_CONFIG).value().equals(purgeIntervalMs.toString())
-                        && config.get(TopicConfig.SEGMENT_BYTES_CONFIG).value().equals(purgeSegmentBytes.toString());
+                        && config.get(TopicConfig.SEGMENT_MS_CONFIG).value().equals(PURGE_INTERVAL_MS.toString())
+                        && config.get(TopicConfig.SEGMENT_BYTES_CONFIG).value().equals(PURGE_SEGMENT_BYTES.toString());
             } catch (final Exception e) {
                 return false;
             }
@@ -104,7 +105,6 @@ public class PurgeRepartitionTopicIntegrationTest {
     }
 
     private interface TopicSizeVerifier {
-
         boolean verify(long currentSize);
     }
 
@@ -117,18 +117,19 @@ public class PurgeRepartitionTopicIntegrationTest {
 
         @Override
         public final boolean conditionMet() {
-            time.sleep(purgeIntervalMs);
+            time.sleep(PURGE_INTERVAL_MS);
 
             try {
-                final Collection<DescribeLogDirsResponse.LogDirInfo> logDirInfo = adminClient.describeLogDirs(Collections.singleton(0)).values().get(0).get().values();
+                final Collection<DescribeLogDirsResponse.LogDirInfo> logDirInfo =
+                    adminClient.describeLogDirs(Collections.singleton(0)).values().get(0).get().values();
 
                 for (final DescribeLogDirsResponse.LogDirInfo partitionInfo : logDirInfo) {
-                    final DescribeLogDirsResponse.ReplicaInfo replicaInfo = partitionInfo.replicaInfos.get(new TopicPartition(REPARTITION_TOPIC, 0));
+                    final DescribeLogDirsResponse.ReplicaInfo replicaInfo =
+                        partitionInfo.replicaInfos.get(new TopicPartition(REPARTITION_TOPIC, 0));
                     if (replicaInfo != null && verifier.verify(replicaInfo.size)) {
                         return true;
                     }
                 }
-
             } catch (final Exception e) {
                 // swallow
             }
@@ -138,7 +139,7 @@ public class PurgeRepartitionTopicIntegrationTest {
     }
 
     @BeforeClass
-    public static void createTopics() throws InterruptedException {
+    public static void createTopics() throws Exception {
         CLUSTER.createTopic(INPUT_TOPIC, 1, 1);
     }
 
@@ -151,18 +152,17 @@ public class PurgeRepartitionTopicIntegrationTest {
 
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, purgeIntervalMs);
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, PURGE_INTERVAL_MS);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(APPLICATION_ID).getPath());
-        streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), purgeIntervalMs);
-        streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), purgeSegmentBytes);
-        streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), purgeSegmentBytes / 2);    // we cannot allow batch size larger than segment size
+        streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), PURGE_INTERVAL_MS);
+        streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), PURGE_SEGMENT_BYTES);
+        streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG), PURGE_SEGMENT_BYTES / 2);    // we cannot allow batch size larger than segment size
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         final StreamsBuilder builder = new StreamsBuilder();
-
         builder.stream(INPUT_TOPIC)
                .groupBy(MockMapper.selectKeyKeyValueMapper())
                .count();
@@ -171,15 +171,14 @@ public class PurgeRepartitionTopicIntegrationTest {
     }
 
     @After
-    public void shutdown() throws IOException {
+    public void shutdown() {
         if (kafkaStreams != null) {
             kafkaStreams.close(Duration.ofSeconds(30));
         }
     }
 
-
     @Test
-    public void shouldRestoreState() throws InterruptedException, ExecutionException {
+    public void shouldRestoreState() throws Exception {
         // produce some data to input topic
         final List<KeyValue<Integer, Integer>> messages = new ArrayList<>();
         for (int i = 0; i < 1000; i++) {
@@ -198,26 +197,16 @@ public class PurgeRepartitionTopicIntegrationTest {
                 "Repartition topic " + REPARTITION_TOPIC + " not created with the expected configs after 60000 ms.");
 
         TestUtils.waitForCondition(
-                new RepartitionTopicVerified(new TopicSizeVerifier() {
-                    @Override
-                    public boolean verify(final long currentSize) {
-                        return currentSize > 0;
-                    }
-                }),
-                60000,
-                "Repartition topic " + REPARTITION_TOPIC + " not received data after 60000 ms."
+            new RepartitionTopicVerified(currentSize -> currentSize > 0),
+            60000,
+            "Repartition topic " + REPARTITION_TOPIC + " not received data after 60000 ms."
         );
 
         // we need long enough timeout to by-pass the log manager's InitialTaskDelayMs, which is hard-coded on server side
         TestUtils.waitForCondition(
-                new RepartitionTopicVerified(new TopicSizeVerifier() {
-                    @Override
-                    public boolean verify(final long currentSize) {
-                        return currentSize <= purgeSegmentBytes;
-                    }
-                }),
-                60000,
-                "Repartition topic " + REPARTITION_TOPIC + " not purged data after 60000 ms."
+            new RepartitionTopicVerified(currentSize -> currentSize <= PURGE_SEGMENT_BYTES),
+            60000,
+            "Repartition topic " + REPARTITION_TOPIC + " not purged data after 60000 ms."
         );
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 06014a1..856a85d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -99,8 +99,7 @@ public class QueryableStateIntegrationTest {
     private static final int NUM_BROKERS = 1;
 
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER =
-        new EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
     private static final int STREAM_THREE_PARTITIONS = 4;
     private final MockTime mockTime = CLUSTER.time;
     private String streamOne = "stream-one";
@@ -292,27 +291,31 @@ public class QueryableStateIntegrationTest {
                                  final Set<String> keys,
                                  final String storeName) throws Exception {
         for (final String key : keys) {
-            TestUtils.waitForCondition(() -> {
-                try {
-                    final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
-
-                    if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+            TestUtils.waitForCondition(
+                () -> {
+                    try {
+                        final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
+
+                        if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+                            return false;
+                        }
+                        final int index = metadata.hostInfo().port();
+                        final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
+                        final ReadOnlyKeyValueStore<String, Long> store =
+                            streamsWithKey.store(storeName, QueryableStoreTypes.keyValueStore());
+
+                        return store != null && store.get(key) != null;
+                    } catch (final IllegalStateException e) {
+                        // Kafka Streams instance may have closed but rebalance hasn't happened
+                        return false;
+                    } catch (final InvalidStateStoreException e) {
+                        // there must have been at least one rebalance state
+                        assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
                         return false;
                     }
-                    final int index = metadata.hostInfo().port();
-                    final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
-                    final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.keyValueStore());
-
-                    return store != null && store.get(key) != null;
-                } catch (final IllegalStateException e) {
-                    // Kafka Streams instance may have closed but rebalance hasn't happened
-                    return false;
-                } catch (final InvalidStateStoreException e) {
-                    // there must have been at least one rebalance state
-                    assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
-                    return false;
-                }
-            }, 120000, "waiting for metadata, store and value to be non null");
+                },
+                120000,
+                "waiting for metadata, store and value to be non null");
         }
     }
 
@@ -324,25 +327,29 @@ public class QueryableStateIntegrationTest {
                                        final Long from,
                                        final Long to) throws Exception {
         for (final String key : keys) {
-            TestUtils.waitForCondition(() -> {
-                try {
-                    final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
-                    if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+            TestUtils.waitForCondition(
+                () -> {
+                    try {
+                        final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
+                        if (metadata == null || metadata.equals(StreamsMetadata.NOT_AVAILABLE)) {
+                            return false;
+                        }
+                        final int index = metadata.hostInfo().port();
+                        final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
+                        final ReadOnlyWindowStore<String, Long> store =
+                            streamsWithKey.store(storeName, QueryableStoreTypes.windowStore());
+                        return store != null && store.fetch(key, ofEpochMilli(from), ofEpochMilli(to)) != null;
+                    } catch (final IllegalStateException e) {
+                        // Kafka Streams instance may have closed but rebalance hasn't happened
+                        return false;
+                    } catch (final InvalidStateStoreException e) {
+                        // there must have been at least one rebalance state
+                        assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
                         return false;
                     }
-                    final int index = metadata.hostInfo().port();
-                    final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
-                    final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.windowStore());
-                    return store != null && store.fetch(key, ofEpochMilli(from), ofEpochMilli(to)) != null;
-                } catch (final IllegalStateException e) {
-                    // Kafka Streams instance may have closed but rebalance hasn't happened
-                    return false;
-                } catch (final InvalidStateStoreException e) {
-                    // there must have been at least one rebalance state
-                    assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
-                    return false;
-                }
-            }, 120000, "waiting for metadata, store and value to be non null");
+                },
+                120000,
+                "waiting for metadata, store and value to be non null");
         }
     }
 
@@ -359,7 +366,13 @@ public class QueryableStateIntegrationTest {
         final String storeName = "word-count-store";
         final String windowStoreName = "windowed-word-count-store";
         for (int i = 0; i < numThreads; i++) {
-            streamRunnables[i] = new StreamRunnable(streamThree, outputTopicThree, outputTopicConcurrentWindowed, storeName, windowStoreName, i);
+            streamRunnables[i] = new StreamRunnable(
+                streamThree,
+                outputTopicThree,
+                outputTopicConcurrentWindowed,
+                storeName,
+                windowStoreName,
+                i);
             streamThreads[i] = new Thread(streamRunnables[i]);
             streamThreads[i].start();
         }
@@ -368,10 +381,20 @@ public class QueryableStateIntegrationTest {
             waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1);
 
             for (int i = 0; i < numThreads; i++) {
-                verifyAllKVKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(), inputValuesKeys,
+                verifyAllKVKeys(
+                    streamRunnables,
+                    streamRunnables[i].getStream(),
+                    streamRunnables[i].getStateListener(),
+                    inputValuesKeys,
                     storeName + "-" + streamThree);
-                verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), streamRunnables[i].getStateListener(), inputValuesKeys,
-                    windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE);
+                verifyAllWindowedKeys(
+                    streamRunnables,
+                    streamRunnables[i].getStream(),
+                    streamRunnables[i].getStateListener(),
+                    inputValuesKeys,
+                    windowStoreName + "-" + streamThree,
+                    0L,
+                    WINDOW_SIZE);
                 assertEquals(KafkaStreams.State.RUNNING, streamRunnables[i].getStream().state());
             }
 
@@ -383,10 +406,20 @@ public class QueryableStateIntegrationTest {
             }
 
             // query from the remaining thread
-            verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(), inputValuesKeys,
+            verifyAllKVKeys(
+                streamRunnables,
+                streamRunnables[0].getStream(),
+                streamRunnables[0].getStateListener(),
+                inputValuesKeys,
                 storeName + "-" + streamThree);
-            verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), streamRunnables[0].getStateListener(), inputValuesKeys,
-                windowStoreName + "-" + streamThree, 0L, WINDOW_SIZE);
+            verifyAllWindowedKeys(
+                streamRunnables,
+                streamRunnables[0].getStream(),
+                streamRunnables[0].getStateListener(),
+                inputValuesKeys,
+                windowStoreName + "-" + streamThree,
+                0L,
+                WINDOW_SIZE);
             assertEquals(KafkaStreams.State.RUNNING, streamRunnables[0].getStream().state());
         } finally {
             for (int i = 0; i < numThreads; i++) {
@@ -407,7 +440,13 @@ public class QueryableStateIntegrationTest {
 
         final ProducerRunnable producerRunnable = new ProducerRunnable(streamConcurrent, inputValues, numIterations);
         final Thread producerThread = new Thread(producerRunnable);
-        kafkaStreams = createCountStream(streamConcurrent, outputTopicConcurrent, outputTopicConcurrentWindowed, storeName, windowStoreName, streamsConfiguration);
+        kafkaStreams = createCountStream(
+            streamConcurrent,
+            outputTopicConcurrent,
+            outputTopicConcurrentWindowed,
+            storeName,
+            windowStoreName,
+            streamsConfiguration);
 
         kafkaStreams.start();
         producerThread.start();
@@ -416,8 +455,8 @@ public class QueryableStateIntegrationTest {
             waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, numberOfWordsPerIteration);
             waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, numberOfWordsPerIteration);
 
-            final ReadOnlyKeyValueStore<String, Long>
-                keyValueStore = kafkaStreams.store(storeName + "-" + streamConcurrent, QueryableStoreTypes.keyValueStore());
+            final ReadOnlyKeyValueStore<String, Long> keyValueStore =
+                kafkaStreams.store(storeName + "-" + streamConcurrent, QueryableStoreTypes.keyValueStore());
 
             final ReadOnlyWindowStore<String, Long> windowStore =
                 kafkaStreams.store(windowStoreName + "-" + streamConcurrent, QueryableStoreTypes.windowStore());
@@ -459,7 +498,8 @@ public class QueryableStateIntegrationTest {
                 new KeyValue<>(keys[3], 5L),
                 new KeyValue<>(keys[4], 2L))
         );
-        final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L)));
+        final Set<KeyValue<String, Long>> expectedBatch1 =
+            new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L)));
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamOne,
@@ -535,7 +575,10 @@ public class QueryableStateIntegrationTest {
             mockTime);
 
         final KTable<String, String> t1 = builder.table(streamOne);
-        t1.mapValues((ValueMapper<String, Long>) Long::valueOf, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()))
+        t1
+            .mapValues(
+                (ValueMapper<String, Long>) Long::valueOf,
+                Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()))
             .toStream()
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
@@ -544,9 +587,8 @@ public class QueryableStateIntegrationTest {
 
         waitUntilAtLeastNumRecordProcessed(outputTopic, 5);
 
-        final ReadOnlyKeyValueStore<String, Long>
-            myMapStore = kafkaStreams.store("queryMapValues",
-            QueryableStoreTypes.keyValueStore());
+        final ReadOnlyKeyValueStore<String, Long> myMapStore =
+            kafkaStreams.store("queryMapValues", QueryableStoreTypes.keyValueStore());
         for (final KeyValue<String, String> batchEntry : batch1) {
             assertEquals(Long.valueOf(batchEntry.value), myMapStore.get(batchEntry.key));
         }
@@ -566,7 +608,8 @@ public class QueryableStateIntegrationTest {
                 new KeyValue<>(keys[3], "5"),
                 new KeyValue<>(keys[4], "2"))
         );
-        final Set<KeyValue<String, Long>> expectedBatch1 = new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L)));
+        final Set<KeyValue<String, Long>> expectedBatch1 =
+            new HashSet<>(Collections.singleton(new KeyValue<>(keys[4], 2L)));
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamOne,
@@ -581,7 +624,10 @@ public class QueryableStateIntegrationTest {
         final Predicate<String, String> filterPredicate = (key, value) -> key.contains("kafka");
         final KTable<String, String> t1 = builder.table(streamOne);
         final KTable<String, String> t2 = t1.filter(filterPredicate, Materialized.as("queryFilter"));
-        final KTable<String, Long> t3 = t2.mapValues((ValueMapper<String, Long>) Long::valueOf, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
+        final KTable<String, Long> t3 = t2
+            .mapValues(
+                (ValueMapper<String, Long>) Long::valueOf,
+                Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("queryMapValues").withValueSerde(Serdes.Long()));
         t3.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
 
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
@@ -596,7 +642,8 @@ public class QueryableStateIntegrationTest {
             assertEquals(myMapStore.get(expectedEntry.key), expectedEntry.value);
         }
         for (final KeyValue<String, String> batchEntry : batch1) {
-            final KeyValue<String, Long> batchEntryMapValue = new KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value));
+            final KeyValue<String, Long> batchEntryMapValue =
+                new KeyValue<>(batchEntry.key, Long.valueOf(batchEntry.value));
             if (!expectedBatch1.contains(batchEntryMapValue)) {
                 assertNull(myMapStore.get(batchEntry.key));
             }
@@ -685,11 +732,19 @@ public class QueryableStateIntegrationTest {
                 mockTime);
 
         final int maxWaitMs = 30000;
-        TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
 
-        final ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
+        TestUtils.waitForCondition(
+            new WaitForStore(storeName),
+            maxWaitMs,
+            "waiting for store " + storeName);
 
-        TestUtils.waitForCondition(() -> new Long(8).equals(store.get("hello")), maxWaitMs, "wait for count to be 8");
+        final ReadOnlyKeyValueStore<String, Long> store =
+            kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
+
+        TestUtils.waitForCondition(
+            () -> new Long(8).equals(store.get("hello")),
+            maxWaitMs,
+            "wait for count to be 8");
 
         // close stream
         kafkaStreams.close();
@@ -699,14 +754,19 @@ public class QueryableStateIntegrationTest {
         kafkaStreams.start();
 
         // make sure we never get any value other than 8 for hello
-        TestUtils.waitForCondition(() -> {
-            try {
-                assertEquals(Long.valueOf(8L), kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()).get("hello"));
-                return true;
-            } catch (final InvalidStateStoreException ise) {
-                return false;
-            }
-        }, maxWaitMs, "waiting for store " + storeName);
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    assertEquals(
+                        Long.valueOf(8L),
+                        kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore()).get("hello"));
+                    return true;
+                } catch (final InvalidStateStoreException ise) {
+                    return false;
+                }
+            },
+            maxWaitMs,
+            "waiting for store " + storeName);
 
     }
 
@@ -756,7 +816,11 @@ public class QueryableStateIntegrationTest {
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamOne,
-            Arrays.asList(KeyValue.pair("a", "1"), KeyValue.pair("a", "2"), KeyValue.pair("b", "3"), KeyValue.pair("b", "4")),
+            Arrays.asList(
+                KeyValue.pair("a", "1"),
+                KeyValue.pair("a", "2"),
+                KeyValue.pair("b", "3"),
+                KeyValue.pair("b", "4")),
             TestUtils.producerConfig(
                 CLUSTER.bootstrapServers(),
                 StringSerializer.class,
@@ -765,9 +829,14 @@ public class QueryableStateIntegrationTest {
             mockTime);
 
         final int maxWaitMs = 30000;
-        TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
 
-        final ReadOnlyKeyValueStore<String, String> store = kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
+        TestUtils.waitForCondition(
+            new WaitForStore(storeName),
+            maxWaitMs,
+            "waiting for store " + storeName);
+
+        final ReadOnlyKeyValueStore<String, String> store =
+            kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
 
         TestUtils.waitForCondition(
             () -> "12".equals(store.get("a")) && "34".equals(store.get("b")),
@@ -784,21 +853,29 @@ public class QueryableStateIntegrationTest {
                 new Properties()),
             mockTime);
 
-        TestUtils.waitForCondition(failed::get, 30000, "wait for thread to fail");
-
-        TestUtils.waitForCondition(new WaitForStore(storeName), maxWaitMs, "waiting for store " + storeName);
+        TestUtils.waitForCondition(
+            failed::get,
+            maxWaitMs,
+            "wait for thread to fail");
+        TestUtils.waitForCondition(
+            new WaitForStore(storeName),
+            maxWaitMs,
+            "waiting for store " + storeName);
 
-        final ReadOnlyKeyValueStore<String, String> store2 = kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
+        final ReadOnlyKeyValueStore<String, String> store2 =
+            kafkaStreams.store(storeName, QueryableStoreTypes.keyValueStore());
 
         try {
-            TestUtils.waitForCondition(() ->
-                ("125".equals(store2.get("a"))
-                || "1225".equals(store2.get("a"))
-                || "12125".equals(store2.get("a")))
-                &&
-                ("34".equals(store2.get("b"))
-                || "344".equals(store2.get("b"))
-                || "3434".equals(store2.get("b"))), maxWaitMs, "wait for agg to be <a,125>||<a,1225>||<a,12125> and <b,34>||<b,344>||<b,3434>");
+            TestUtils.waitForCondition(
+                () -> ("125".equals(store2.get("a"))
+                    || "1225".equals(store2.get("a"))
+                    || "12125".equals(store2.get("a")))
+                    &&
+                    ("34".equals(store2.get("b"))
+                    || "344".equals(store2.get("b"))
+                    || "3434".equals(store2.get("b"))),
+                maxWaitMs,
+                "wait for agg to be <a,125>||<a,1225>||<a,12125> and <b,34>||<b,344>||<b,3434>");
         } catch (final Throwable t) {
             throw new RuntimeException("Store content is a: " + store2.get("a") + "; b: " + store2.get("b"), t);
         }
@@ -914,7 +991,8 @@ public class QueryableStateIntegrationTest {
 
     }
 
-    private void waitUntilAtLeastNumRecordProcessed(final String topic, final int numRecs) throws Exception {
+    private void waitUntilAtLeastNumRecordProcessed(final String topic,
+                                                    final int numRecs) throws Exception {
         final Properties config = new Properties();
         config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer");
@@ -930,7 +1008,8 @@ public class QueryableStateIntegrationTest {
 
     private Set<KeyValue<String, Long>> fetch(final ReadOnlyWindowStore<String, Long> store,
                                               final String key) {
-        final WindowStoreIterator<Long> fetch = store.fetch(key, ofEpochMilli(0), ofEpochMilli(System.currentTimeMillis()));
+        final WindowStoreIterator<Long> fetch =
+            store.fetch(key, ofEpochMilli(0), ofEpochMilli(System.currentTimeMillis()));
         if (fetch.hasNext()) {
             final KeyValue<Long, Long> next = fetch.next();
             return Collections.singleton(KeyValue.pair(key, next.value));
@@ -940,7 +1019,8 @@ public class QueryableStateIntegrationTest {
 
     private Map<String, Long> fetchMap(final ReadOnlyWindowStore<String, Long> store,
                                        final String key) {
-        final WindowStoreIterator<Long> fetch = store.fetch(key, ofEpochMilli(0), ofEpochMilli(System.currentTimeMillis()));
+        final WindowStoreIterator<Long> fetch =
+            store.fetch(key, ofEpochMilli(0), ofEpochMilli(System.currentTimeMillis()));
         if (fetch.hasNext()) {
             final KeyValue<Long, Long> next = fetch.next();
             return Collections.singletonMap(key, next.value);
@@ -958,7 +1038,9 @@ public class QueryableStateIntegrationTest {
         private int currIteration = 0;
         boolean shutdown = false;
 
-        ProducerRunnable(final String topic, final List<String> inputValues, final int numIterations) {
+        ProducerRunnable(final String topic,
+                         final List<String> inputValues,
+                         final int numIterations) {
             this.topic = topic;
             this.inputValues = inputValues;
             this.numIterations = numIterations;


Mime
View raw message