kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch trunk updated: MINOR: code cleanup (#6055)
Date Wed, 09 Jan 2019 14:03:39 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6e7149b  MINOR: code cleanup (#6055)
6e7149b is described below

commit 6e7149b77a10ac6aa4da2edd549f468015170236
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Wed Jan 9 15:03:28 2019 +0100

    MINOR: code cleanup (#6055)
    
    Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
---
 .../internals/CompositeRestoreListenerTest.java    |   6 +-
 .../internals/GlobalStreamThreadTest.java          |  67 ++--
 .../internals/ProcessorStateManagerTest.java       |   4 +-
 .../processor/internals/StateRestorerTest.java     |  20 +-
 .../internals/StoreChangelogReaderTest.java        | 297 +++++++++++----
 .../processor/internals/StreamThreadTest.java      | 419 +++++++++++----------
 6 files changed, 515 insertions(+), 298 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
index 5bfa4a6..5d7078c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.test.MockBatchingStateRestoreListener;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.junit.Test;
 
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -46,8 +46,8 @@ public class CompositeRestoreListenerTest {
         noListenBatchingStateRestoreCallback =
         new MockNoListenBatchingStateRestoreCallback();
     private final MockStateRestoreListener reportingStoreListener = new MockStateRestoreListener();
-    private final byte[] key = "key".getBytes(Charset.forName("UTF-8"));
-    private final byte[] value = "value".getBytes(Charset.forName("UTF-8"));
+    private final byte[] key = "key".getBytes(StandardCharsets.UTF_8);
+    private final byte[] value = "value".getBytes(StandardCharsets.UTF_8);
     private final Collection<KeyValue<byte[], byte[]>> records = Collections.singletonList(KeyValue.pair(key, value));
     private final Collection<ConsumerRecord<byte[], byte[]>> consumerRecords = Collections.singletonList(
         new ConsumerRecord<>("", 0, 0L, key, value)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 4e6023f..c0e0de3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.MockStateRestoreListener;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -163,7 +162,7 @@ public class GlobalStreamThreadTest {
     }
 
     @Test(timeout = 30000)
-    public void shouldStopRunningWhenClosedByUser() throws InterruptedException {
+    public void shouldStopRunningWhenClosedByUser() throws Exception {
         initializeConsumer();
         globalStreamThread.start();
         globalStreamThread.shutdown();
@@ -172,7 +171,7 @@ public class GlobalStreamThreadTest {
     }
 
     @Test
-    public void shouldCloseStateStoresOnClose() throws InterruptedException {
+    public void shouldCloseStateStoresOnClose() throws Exception {
         initializeConsumer();
         globalStreamThread.start();
         final StateStore globalStore = builder.globalStateStores().get(GLOBAL_STORE_NAME);
@@ -184,7 +183,7 @@ public class GlobalStreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldTransitionToDeadOnClose() throws InterruptedException {
+    public void shouldTransitionToDeadOnClose() throws Exception {
         initializeConsumer();
         globalStreamThread.start();
         globalStreamThread.shutdown();
@@ -195,7 +194,7 @@ public class GlobalStreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldStayDeadAfterTwoCloses() throws InterruptedException {
+    public void shouldStayDeadAfterTwoCloses() throws Exception {
         initializeConsumer();
         globalStreamThread.start();
         globalStreamThread.shutdown();
@@ -207,15 +206,15 @@ public class GlobalStreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldTransitionToRunningOnStart() throws InterruptedException {
+    public void shouldTransitionToRunningOnStart() throws Exception {
         initializeConsumer();
         globalStreamThread.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return globalStreamThread.state() == RUNNING;
-            }
-        }, 10 * 1000, "Thread never started.");
+
+        TestUtils.waitForCondition(
+            () -> globalStreamThread.state() == RUNNING,
+            10 * 1000,
+            "Thread never started.");
+
         globalStreamThread.shutdown();
     }
 
@@ -223,22 +222,19 @@ public class GlobalStreamThreadTest {
     public void shouldDieOnInvalidOffsetException() throws Exception {
         initializeConsumer();
         globalStreamThread.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return globalStreamThread.state() == RUNNING;
-            }
-        }, 10 * 1000, "Thread never started.");
+
+        TestUtils.waitForCondition(
+            () -> globalStreamThread.state() == RUNNING,
+            10 * 1000,
+            "Thread never started.");
 
         mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 1L));
         mockConsumer.addRecord(new ConsumerRecord<>(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes()));
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return mockConsumer.position(topicPartition) == 1L;
-            }
-        }, 10 * 1000, "Input record never consumed");
+        TestUtils.waitForCondition(
+            () -> mockConsumer.position(topicPartition) == 1L,
+            10 * 1000,
+            "Input record never consumed");
 
         mockConsumer.setException(new InvalidOffsetException("Try Again!") {
             @Override
@@ -249,20 +245,21 @@ public class GlobalStreamThreadTest {
         // feed first record for recovery
         mockConsumer.addRecord(new ConsumerRecord<>(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes()));
 
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return globalStreamThread.state() == DEAD;
-            }
-        }, 10 * 1000, "GlobalStreamThread should have died.");
+        TestUtils.waitForCondition(
+            () -> globalStreamThread.state() == DEAD,
+            10 * 1000,
+            "GlobalStreamThread should have died.");
     }
 
     private void initializeConsumer() {
-        mockConsumer.updatePartitions(GLOBAL_STORE_TOPIC_NAME, Collections.singletonList(new PartitionInfo(GLOBAL_STORE_TOPIC_NAME,
-            0,
-            null,
-            new Node[0],
-            new Node[0])));
+        mockConsumer.updatePartitions(
+            GLOBAL_STORE_TOPIC_NAME,
+            Collections.singletonList(new PartitionInfo(
+                GLOBAL_STORE_TOPIC_NAME,
+                0,
+                null,
+                new Node[0],
+                new Node[0])));
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
         mockConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 0L));
         mockConsumer.assign(Collections.singleton(topicPartition));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 475b5e9..90f2420 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -41,7 +41,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -82,7 +82,7 @@ public class ProcessorStateManagerTest {
     private final MockChangelogReader changelogReader = new MockChangelogReader();
     private final MockKeyValueStore mockKeyValueStore = new MockKeyValueStore(storeName, true);
     private final byte[] key = new byte[]{0x0, 0x0, 0x0, 0x1};
-    private final byte[] value = "the-value".getBytes(Charset.forName("UTF-8"));
+    private final byte[] value = "the-value".getBytes(StandardCharsets.UTF_8);
     private final ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>(changelogTopic, 0, 0, key, value);
     private final LogContext logContext = new LogContext("process-state-manager-test ");
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index dc22bb4..3fa4b1e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -35,8 +35,13 @@ public class StateRestorerTest {
     private final MockRestoreCallback callback = new MockRestoreCallback();
     private final MockStateRestoreListener reportingListener = new MockStateRestoreListener();
     private final CompositeRestoreListener compositeRestoreListener = new CompositeRestoreListener(callback);
-    private final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), compositeRestoreListener,
-                                                             null, OFFSET_LIMIT, true, "storeName");
+    private final StateRestorer restorer = new StateRestorer(
+        new TopicPartition("topic", 1),
+        compositeRestoreListener,
+        null,
+        OFFSET_LIMIT,
+        true,
+        "storeName");
 
     @Before
     public void setUp() {
@@ -66,10 +71,13 @@ public class StateRestorerTest {
 
     @Test
     public void shouldBeCompletedIfOffsetAndOffsetLimitAreZero() {
-        final StateRestorer
-            restorer =
-            new StateRestorer(new TopicPartition("topic", 1), compositeRestoreListener, null, 0, true,
-                              "storeName");
+        final StateRestorer restorer = new StateRestorer(
+            new TopicPartition("topic", 1),
+            compositeRestoreListener,
+            null,
+            0,
+            true,
+            "storeName");
         assertTrue(restorer.hasCompleted(0, 10));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index d08f0d7..ebe50c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -73,7 +73,11 @@ public class StoreChangelogReaderTest {
     private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
     private final TopicPartition topicPartition = new TopicPartition("topic", 0);
     private final LogContext logContext = new LogContext("test-reader ");
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Duration.ZERO, stateRestoreListener, logContext);
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(
+        consumer,
+        Duration.ZERO,
+        stateRestoreListener,
+        logContext);
 
     @Before
     public void setUp() {
@@ -91,8 +95,18 @@ public class StoreChangelogReaderTest {
             }
         };
 
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Duration.ZERO, stateRestoreListener, logContext);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(
+            consumer,
+            Duration.ZERO,
+            stateRestoreListener,
+            logContext);
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            Long.MAX_VALUE,
+            true,
+            "storeName"));
         changelogReader.restore(active);
         assertTrue(functionCalled.get());
     }
@@ -123,7 +137,13 @@ public class StoreChangelogReaderTest {
     public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            Long.MAX_VALUE,
+            true,
+            "storeName"));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
         changelogReader.restore(active);
@@ -141,8 +161,13 @@ public class StoreChangelogReaderTest {
                 return Collections.singleton(topicPartition);
             }
         });
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            Long.MAX_VALUE,
+            true,
+            "storeName"));
 
         EasyMock.expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         EasyMock.replay(active, task);
@@ -150,8 +175,13 @@ public class StoreChangelogReaderTest {
         // first restore call "fails" but we should not die with an exception
         assertEquals(0, changelogReader.restore(active).size());
 
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
-                "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            Long.MAX_VALUE,
+            true,
+            "storeName"));
         // retry restore should succeed
         assertEquals(1, changelogReader.restore(active).size());
         assertThat(callback.restored.size(), equalTo(messages));
@@ -167,7 +197,7 @@ public class StoreChangelogReaderTest {
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, (long) (messages + startOffset)));
 
         addRecords(messages, topicPartition, startOffset);
-        consumer.assign(Collections.<TopicPartition>emptyList());
+        consumer.assign(Collections.emptyList());
 
         final StateRestorer stateRestorer = new StateRestorer(
                 topicPartition,
@@ -198,8 +228,13 @@ public class StoreChangelogReaderTest {
     public void shouldRestoreMessagesFromCheckpoint() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, 5L, Long.MAX_VALUE, true,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            5L,
+            Long.MAX_VALUE,
+            true,
+            "storeName"));
 
         changelogReader.restore(active);
         assertThat(callback.restored.size(), equalTo(5));
@@ -209,8 +244,13 @@ public class StoreChangelogReaderTest {
     public void shouldClearAssignmentAtEndOfRestore() {
         final int messages = 1;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            Long.MAX_VALUE,
+            true,
+            "storeName"));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
         changelogReader.restore(active);
@@ -220,8 +260,13 @@ public class StoreChangelogReaderTest {
     @Test
     public void shouldRestoreToLimitWhenSupplied() {
         setupConsumer(10, topicPartition);
-        final StateRestorer restorer = new StateRestorer(topicPartition, restoreListener, null, 3, true,
-                                                         "storeName");
+        final StateRestorer restorer = new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            3,
+            true,
+            "storeName");
         changelogReader.register(restorer);
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
@@ -242,9 +287,27 @@ public class StoreChangelogReaderTest {
         setupConsumer(5, one);
         setupConsumer(3, two);
 
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName1"));
-        changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE, true, "storeName2"));
-        changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE, true, "storeName3"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            Long.MAX_VALUE,
+            true,
+            "storeName1"));
+        changelogReader.register(new StateRestorer(
+            one,
+            restoreListener1,
+            null,
+            Long.MAX_VALUE,
+            true,
+            "storeName2"));
+        changelogReader.register(new StateRestorer(
+            two,
+            restoreListener2,
+            null,
+            Long.MAX_VALUE,
+            true,
+            "storeName3"));
 
         expect(active.restoringTaskFor(one)).andStubReturn(task);
         expect(active.restoringTaskFor(two)).andStubReturn(task);
@@ -258,7 +321,7 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldRestoreAndNotifyMultipleStores() throws Exception {
+    public void shouldRestoreAndNotifyMultipleStores() {
         final TopicPartition one = new TopicPartition("one", 0);
         final TopicPartition two = new TopicPartition("two", 0);
         final MockStateRestoreListener callbackOne = new MockStateRestoreListener();
@@ -269,10 +332,27 @@ public class StoreChangelogReaderTest {
         setupConsumer(5, one);
         setupConsumer(3, two);
 
-        changelogReader
-            .register(new StateRestorer(topicPartition, restoreListener, 0L, Long.MAX_VALUE, true, "storeName1"));
-        changelogReader.register(new StateRestorer(one, restoreListener1, 0L, Long.MAX_VALUE, true, "storeName2"));
-        changelogReader.register(new StateRestorer(two, restoreListener2, 0L, Long.MAX_VALUE, true, "storeName3"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            0L,
+            Long.MAX_VALUE,
+            true,
+            "storeName1"));
+        changelogReader.register(new StateRestorer(
+            one,
+            restoreListener1,
+            0L,
+            Long.MAX_VALUE,
+            true,
+            "storeName2"));
+        changelogReader.register(new StateRestorer(
+            two,
+            restoreListener2,
+            0L,
+            Long.MAX_VALUE,
+            true,
+            "storeName3"));
 
         expect(active.restoringTaskFor(one)).andReturn(task);
         expect(active.restoringTaskFor(two)).andReturn(task);
@@ -300,8 +380,13 @@ public class StoreChangelogReaderTest {
     @Test
     public void shouldOnlyReportTheLastRestoredOffset() {
         setupConsumer(10, topicPartition);
-        changelogReader
-            .register(new StateRestorer(topicPartition, restoreListener, 0L, 5, true, "storeName1"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            0L,
+            5,
+            true,
+            "storeName1"));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
         changelogReader.restore(active);
@@ -311,7 +396,6 @@ public class StoreChangelogReaderTest {
         assertCorrectOffsetsReportedByListener(callback, 0L, 4L, 5L);
     }
 
-
     private void assertAllCallbackStatesExecuted(final MockStateRestoreListener restoreListener,
                                                  final String storeName) {
         assertThat(restoreListener.storeNameCalledStates.get(RESTORE_START), equalTo(storeName));
@@ -319,7 +403,6 @@ public class StoreChangelogReaderTest {
         assertThat(restoreListener.storeNameCalledStates.get(RESTORE_END), equalTo(storeName));
     }
 
-
     private void assertCorrectOffsetsReportedByListener(final MockStateRestoreListener restoreListener,
                                                         final long startOffset,
                                                         final long batchOffset,
@@ -332,9 +415,13 @@ public class StoreChangelogReaderTest {
 
     @Test
     public void shouldNotRestoreAnythingWhenPartitionIsEmpty() {
-        final StateRestorer
-            restorer =
-            new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName");
+        final StateRestorer restorer = new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            Long.MAX_VALUE,
+            true,
+            "storeName");
         setupConsumer(0, topicPartition);
         changelogReader.register(restorer);
 
@@ -345,11 +432,15 @@ public class StoreChangelogReaderTest {
 
     @Test
     public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() {
-        final Long endOffset = 10L;
+        final long endOffset = 10L;
         setupConsumer(endOffset, topicPartition);
-        final StateRestorer
-            restorer =
-            new StateRestorer(topicPartition, restoreListener, endOffset, Long.MAX_VALUE, true, "storeName");
+        final StateRestorer restorer = new StateRestorer(
+            topicPartition,
+            restoreListener,
+            endOffset,
+            Long.MAX_VALUE,
+            true,
+            "storeName");
 
         changelogReader.register(restorer);
 
@@ -361,7 +452,13 @@ public class StoreChangelogReaderTest {
     @Test
     public void shouldReturnRestoredOffsetsForPersistentStores() {
         setupConsumer(10, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            Long.MAX_VALUE,
+            true,
+            "storeName"));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
         changelogReader.restore(active);
@@ -373,12 +470,18 @@ public class StoreChangelogReaderTest {
     @Test
     public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
         setupConsumer(10, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            Long.MAX_VALUE,
+            false,
+            "storeName"));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
         changelogReader.restore(active);
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
-        assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap()));
+        assertThat(restoredOffsets, equalTo(Collections.emptyMap()));
     }
 
     @Test
@@ -386,11 +489,16 @@ public class StoreChangelogReaderTest {
         assignPartition(3, topicPartition);
         final byte[] bytes = new byte[0];
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 0, bytes, bytes));
-        consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 1, (byte[]) null, bytes));
+        consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 1, null, bytes));
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 2, bytes, bytes));
         consumer.assign(Collections.singletonList(topicPartition));
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            Long.MAX_VALUE,
+            false,
+            "storeName"));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
         changelogReader.restore(active);
@@ -402,7 +510,13 @@ public class StoreChangelogReaderTest {
     public void shouldCompleteImmediatelyWhenEndOffsetIs0() {
         final Collection<TopicPartition> expected = Collections.singleton(topicPartition);
         setupConsumer(0, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "store"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            Long.MAX_VALUE,
+            true,
+            "store"));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
 
@@ -417,7 +531,13 @@ public class StoreChangelogReaderTest {
 
         setupConsumer(1, topicPartition);
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 10L));
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            Long.MAX_VALUE,
+            false,
+            "storeName"));
 
         final TopicPartition postInitialization = new TopicPartition("other", 0);
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
@@ -432,7 +552,13 @@ public class StoreChangelogReaderTest {
         consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 0L));
         consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 3L));
 
-        changelogReader.register(new StateRestorer(postInitialization, restoreListener2, null, Long.MAX_VALUE, false, "otherStore"));
+        changelogReader.register(new StateRestorer(
+            postInitialization,
+            restoreListener2,
+            null,
+            Long.MAX_VALUE,
+            false,
+            "otherStore"));
 
         final Collection<TopicPartition> expected = Utils.mkSet(topicPartition, postInitialization);
         consumer.assign(expected);
@@ -442,7 +568,6 @@ public class StoreChangelogReaderTest {
         assertThat(callbackTwo.restored.size(), equalTo(3));
     }
 
-
     @Test
     public void shouldNotThrowTaskMigratedExceptionIfSourceTopicUpdatedDuringRestoreProcess() {
         final int messages = 10;
@@ -450,7 +575,13 @@ public class StoreChangelogReaderTest {
         // in this case first call to endOffsets returns correct value, but a second thread has updated the source topic
         // but since it's a source topic, the second check should not fire hence no exception
         consumer.addEndOffsets(Collections.singletonMap(topicPartition, 15L));
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 9L, true, "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            9L,
+            true,
+            "storeName"));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -458,7 +589,6 @@ public class StoreChangelogReaderTest {
         changelogReader.restore(active);
     }
 
-
     @Test
     public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSEnabled() {
         final int totalMessages = 10;
@@ -466,7 +596,13 @@ public class StoreChangelogReaderTest {
         // records have offsets of 0..9 10 is commit marker so 11 is end offset
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 11L));
 
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            Long.MAX_VALUE,
+            true,
+            "storeName"));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -475,13 +611,18 @@ public class StoreChangelogReaderTest {
         assertThat(callback.restored.size(), equalTo(10));
     }
 
-
     @Test
     public void shouldNotThrowTaskMigratedExceptionDuringRestoreForChangelogTopicWhenEndOffsetNotExceededEOSDisabled() {
         final int totalMessages = 10;
         setupConsumer(totalMessages, topicPartition);
 
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            Long.MAX_VALUE,
+            true,
+            "storeName"));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -494,7 +635,13 @@ public class StoreChangelogReaderTest {
     public void shouldNotThrowTaskMigratedExceptionIfEndOffsetGetsExceededDuringRestoreForSourceTopic() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            5,
+            true,
+            "storeName"));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -508,7 +655,13 @@ public class StoreChangelogReaderTest {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
 
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 10, true, "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            10,
+            true,
+            "storeName"));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -525,11 +678,17 @@ public class StoreChangelogReaderTest {
         addRecords(5, topicPartition, 0);
         //EOS enabled so commit marker at offset 5 so records start at 6
         addRecords(5, topicPartition, 6);
-        consumer.assign(Collections.<TopicPartition>emptyList());
+        consumer.assign(Collections.emptyList());
         // commit marker is 5 so ending offset is 12
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 12L));
 
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 6, true, "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            6,
+            true,
+            "storeName"));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -545,7 +704,13 @@ public class StoreChangelogReaderTest {
         // records have offsets 0..9 10 is commit marker so 11 is ending offset
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 11L));
 
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, 11, true, "storeName"));
+        changelogReader.register(new StateRestorer(
+            topicPartition,
+            restoreListener,
+            null,
+            11,
+            true,
+            "storeName"));
 
         expect(active.restoringTaskFor(topicPartition)).andReturn(task);
         replay(active);
@@ -558,26 +723,32 @@ public class StoreChangelogReaderTest {
                                final TopicPartition topicPartition) {
         assignPartition(messages, topicPartition);
         addRecords(messages, topicPartition, 0);
-        consumer.assign(Collections.<TopicPartition>emptyList());
+        consumer.assign(Collections.emptyList());
     }
 
     private void addRecords(final long messages,
                             final TopicPartition topicPartition,
                             final int startingOffset) {
         for (int i = 0; i < messages; i++) {
-            consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), startingOffset + i, new byte[0], new byte[0]));
+            consumer.addRecord(new ConsumerRecord<>(
+                topicPartition.topic(),
+                topicPartition.partition(),
+                startingOffset + i,
+                new byte[0],
+                new byte[0]));
         }
     }
 
     private void assignPartition(final long messages,
                                  final TopicPartition topicPartition) {
-        consumer.updatePartitions(topicPartition.topic(),
-                                  Collections.singletonList(
-                                      new PartitionInfo(topicPartition.topic(),
-                                                        topicPartition.partition(),
-                                                        null,
-                                                        null,
-                                                        null)));
+        consumer.updatePartitions(
+            topicPartition.topic(),
+            Collections.singletonList(new PartitionInfo(
+                topicPartition.topic(),
+                topicPartition.partition(),
+                null,
+                null,
+                null)));
         consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0, messages)));
         consumer.assign(Collections.singletonList(topicPartition));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index dd311fb..d499b7c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.time.Duration;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -34,7 +33,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serdes;
@@ -55,7 +53,6 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
@@ -67,7 +64,6 @@ import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
 import org.junit.Assert;
@@ -75,7 +71,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -176,7 +172,7 @@ public class StreamThreadTest {
 
         // assign single partition
         assignedPartitions = singletonList(t1p1);
-        thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.taskManager().setAssignmentMetadata(Collections.emptyMap(), Collections.emptyMap());
 
         final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
         mockConsumer.assign(assignedPartitions);
@@ -192,27 +188,23 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void testStateChangeStartClose() throws InterruptedException {
+    public void testStateChangeStartClose() throws Exception {
         final StreamThread thread = createStreamThread(clientId, config, false);
 
         final StateListenerStub stateListener = new StateListenerStub();
         thread.setStateListener(stateListener);
 
         thread.start();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return thread.state() == StreamThread.State.STARTING;
-            }
-        }, 10 * 1000, "Thread never started.");
+        TestUtils.waitForCondition(
+            () -> thread.state() == StreamThread.State.STARTING,
+            10 * 1000,
+            "Thread never started.");
 
         thread.shutdown();
-        TestUtils.waitForCondition(new TestCondition() {
-            @Override
-            public boolean conditionMet() {
-                return thread.state() == StreamThread.State.DEAD;
-            }
-        }, 10 * 1000, "Thread never shut down.");
+        TestUtils.waitForCondition(
+            () -> thread.state() == StreamThread.State.DEAD,
+            10 * 1000,
+            "Thread never shut down.");
 
         thread.shutdown();
         assertEquals(thread.state(), StreamThread.State.DEAD);
@@ -223,9 +215,9 @@ public class StreamThreadTest {
         return new Cluster(
             "mockClusterId",
             singletonList(node),
-            Collections.<PartitionInfo>emptySet(),
-            Collections.<String>emptySet(),
-            Collections.<String>emptySet(),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            Collections.emptySet(),
             node
         );
     }
@@ -289,8 +281,6 @@ public class StreamThreadTest {
                 defaultGroupName, thread.getName())));
     }
 
-
-    @SuppressWarnings({"unchecked", "ThrowableNotThrown"})
     @Test
     public void shouldNotCommitBeforeTheCommitInterval() {
         final long commitInterval = 1000L;
@@ -302,7 +292,8 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -350,7 +341,7 @@ public class StreamThreadTest {
             Collections.singletonMap(
                 new TaskId(0, t1p1.partition()),
                 assignedPartitions),
-            Collections.<TaskId, Set<TopicPartition>>emptyMap());
+            Collections.emptyMap());
 
         final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
         mockConsumer.assign(Collections.singleton(t1p1));
@@ -360,13 +351,13 @@ public class StreamThreadTest {
 
         // processed one record, punctuated after the first record, and hence num.iterations is still 1
         long offset = -1;
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 0, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        addRecord(mockConsumer, ++offset, 0L);
         thread.runOnce();
 
         assertThat(thread.currentNumIterations(), equalTo(1));
 
         // processed one more record without punctuation, and bump num.iterations to 2
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        addRecord(mockConsumer, ++offset, 1L);
         thread.runOnce();
 
         assertThat(thread.currentNumIterations(), equalTo(2));
@@ -382,28 +373,28 @@ public class StreamThreadTest {
         assertThat(thread.currentNumIterations(), equalTo(1));
 
         // processed two records, bumping up iterations to 2
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 5, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 6, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        addRecord(mockConsumer, ++offset, 5L);
+        addRecord(mockConsumer, ++offset, 6L);
         thread.runOnce();
 
         assertThat(thread.currentNumIterations(), equalTo(2));
 
         // stream time based punctutation halves to 1
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 11, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        addRecord(mockConsumer, ++offset, 11L);
         thread.runOnce();
 
         assertThat(thread.currentNumIterations(), equalTo(1));
 
         // processed three records, bumping up iterations to 3 (1 + 2)
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 12, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 13, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 14, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        addRecord(mockConsumer, ++offset, 12L);
+        addRecord(mockConsumer, ++offset, 13L);
+        addRecord(mockConsumer, ++offset, 14L);
         thread.runOnce();
 
         assertThat(thread.currentNumIterations(), equalTo(3));
 
         mockProcessor.requestCommit();
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 15, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        addRecord(mockConsumer, ++offset, 15L);
         thread.runOnce();
 
         // user requested commit should not impact on iteration adjustment
@@ -417,7 +408,6 @@ public class StreamThreadTest {
 
     }
 
-    @SuppressWarnings({"unchecked", "ThrowableNotThrown"})
     @Test
     public void shouldNotCauseExceptionIfNothingCommitted() {
         final long commitInterval = 1000L;
@@ -429,7 +419,8 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -453,8 +444,6 @@ public class StreamThreadTest {
         EasyMock.verify(taskManager);
     }
 
-
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldCommitAfterTheCommitInterval() {
         final long commitInterval = 1000L;
@@ -466,7 +455,8 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -490,8 +480,9 @@ public class StreamThreadTest {
         EasyMock.verify(taskManager);
     }
 
-    @SuppressWarnings({"ThrowableNotThrown", "unchecked"})
-    private TaskManager mockTaskManagerCommit(final Consumer<byte[], byte[]> consumer, final int numberOfCommits, final int commits) {
+    private TaskManager mockTaskManagerCommit(final Consumer<byte[], byte[]> consumer,
+                                              final int numberOfCommits,
+                                              final int commits) {
         final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
         EasyMock.expect(taskManager.commitAll()).andReturn(commits).times(numberOfCommits);
         EasyMock.replay(taskManager, consumer);
@@ -517,7 +508,7 @@ public class StreamThreadTest {
         activeTasks.put(task1, Collections.singleton(t1p1));
         activeTasks.put(task2, Collections.singleton(t1p2));
 
-        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
 
         final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
         mockConsumer.assign(assignedPartitions);
@@ -543,7 +534,7 @@ public class StreamThreadTest {
         final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true);
 
         thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -554,7 +545,7 @@ public class StreamThreadTest {
         activeTasks.put(task1, Collections.singleton(t1p1));
         activeTasks.put(task2, Collections.singleton(t1p2));
 
-        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
 
         final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
         mockConsumer.assign(assignedPartitions);
@@ -578,7 +569,7 @@ public class StreamThreadTest {
         final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true);
 
         thread.setState(StreamThread.State.STARTING);
-        thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList());
+        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -589,7 +580,7 @@ public class StreamThreadTest {
         activeTasks.put(task1, Collections.singleton(t1p1));
         activeTasks.put(task2, Collections.singleton(t1p2));
 
-        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
         final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
         mockConsumer.assign(assignedPartitions);
         final Map<TopicPartition, Long> beginOffsets = new HashMap<>();
@@ -607,7 +598,6 @@ public class StreamThreadTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldShutdownTaskManagerOnClose() {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
@@ -616,7 +606,8 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -632,19 +623,15 @@ public class StreamThreadTest {
             new AtomicInteger()
         );
         thread.setStateListener(
-            new StreamThread.StateListener() {
-                @Override
-                public void onChange(final Thread t, final ThreadStateTransitionValidator newState, final ThreadStateTransitionValidator oldState) {
-                    if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) {
-                        thread.shutdown();
-                    }
+            (t, newState, oldState) -> {
+                if (oldState == StreamThread.State.CREATED && newState == StreamThread.State.STARTING) {
+                    thread.shutdown();
                 }
             });
         thread.run();
         EasyMock.verify(taskManager);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldShutdownTaskManagerOnCloseWithoutStart() {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
@@ -653,7 +640,8 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -672,7 +660,6 @@ public class StreamThreadTest {
         EasyMock.verify(taskManager);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void shouldOnlyShutdownOnce() {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
@@ -681,7 +668,8 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -717,10 +705,10 @@ public class StreamThreadTest {
         // assign single partition
         standbyTasks.put(task1, Collections.singleton(t1p1));
 
-        thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks);
-        thread.taskManager().createTasks(Collections.<TopicPartition>emptyList());
+        thread.taskManager().setAssignmentMetadata(Collections.emptyMap(), standbyTasks);
+        thread.taskManager().createTasks(Collections.emptyList());
 
-        thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
     }
 
     @Test
@@ -744,7 +732,7 @@ public class StreamThreadTest {
         assignedPartitions.add(t1p1);
         activeTasks.put(task1, Collections.singleton(t1p1));
 
-        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
 
         final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
         mockConsumer.assign(assignedPartitions);
@@ -768,12 +756,7 @@ public class StreamThreadTest {
         assertFalse(producer.transactionCommitted());
         mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L);
         TestUtils.waitForCondition(
-            new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return producer.commitCount() == 1;
-                }
-            },
+            () -> producer.commitCount() == 1,
             "StreamsThread did not commit transaction.");
 
         producer.fenceProducer();
@@ -784,12 +767,7 @@ public class StreamThreadTest {
             fail("Should have thrown TaskMigratedException");
         } catch (final TaskMigratedException expected) { /* ignore */ }
         TestUtils.waitForCondition(
-            new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return thread.tasks().isEmpty();
-                }
-            },
+            () -> thread.tasks().isEmpty(),
             "StreamsThread did not remove fenced zombie task.");
 
         assertThat(producer.commitCount(), equalTo(1L));
@@ -812,7 +790,7 @@ public class StreamThreadTest {
         assignedPartitions.add(t1p1);
         activeTasks.put(task1, Collections.singleton(t1p1));
 
-        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
 
         final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
         mockConsumer.assign(assignedPartitions);
@@ -848,7 +826,7 @@ public class StreamThreadTest {
         assignedPartitions.add(t1p1);
         activeTasks.put(task1, Collections.singleton(t1p1));
 
-        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
 
         final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
         mockConsumer.assign(assignedPartitions);
@@ -904,7 +882,7 @@ public class StreamThreadTest {
         assignedPartitions.add(t1p1);
         activeTasks.put(task1, Collections.singleton(t1p1));
 
-        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
 
         final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
         mockConsumer.assign(assignedPartitions);
@@ -922,7 +900,7 @@ public class StreamThreadTest {
     @Test
     public void shouldReturnStandbyTaskMetadataWhileRunningState() {
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
-            .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count-one"));
+            .groupByKey().count(Materialized.as("count-one"));
 
         internalStreamsBuilder.buildAndOptimizeTopology();
         final StreamThread thread = createStreamThread(clientId, config, false);
@@ -951,9 +929,9 @@ public class StreamThreadTest {
         // assign single partition
         standbyTasks.put(task1, Collections.singleton(t1p1));
 
-        thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks);
+        thread.taskManager().setAssignmentMetadata(Collections.emptyMap(), standbyTasks);
 
-        thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
 
         thread.runOnce();
 
@@ -965,17 +943,20 @@ public class StreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldUpdateStandbyTask() throws IOException {
+    public void shouldUpdateStandbyTask() throws Exception {
         final String storeName1 = "count-one";
         final String storeName2 = "table-two";
         final String changelogName1 = applicationId + "-" + storeName1 + "-changelog";
         final String changelogName2 = applicationId + "-" + storeName2 + "-changelog";
         final TopicPartition partition1 = new TopicPartition(changelogName1, 1);
         final TopicPartition partition2 = new TopicPartition(changelogName2, 1);
-        internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
-            .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as(storeName1));
-        final MaterializedInternal materialized = new MaterializedInternal(Materialized.as(storeName2), internalStreamsBuilder, "");
-        internalStreamsBuilder.table(topic2, new ConsumedInternal(), materialized);
+        internalStreamsBuilder
+            .stream(Collections.singleton(topic1), consumed)
+            .groupByKey()
+            .count(Materialized.as(storeName1));
+        final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized
+            = new MaterializedInternal<>(Materialized.as(storeName2), internalStreamsBuilder, "");
+        internalStreamsBuilder.table(topic2, new ConsumedInternal<>(), materialized);
 
         internalStreamsBuilder.buildAndOptimizeTopology();
         final StreamThread thread = createStreamThread(clientId, config, false);
@@ -998,12 +979,23 @@ public class StreamThreadTest {
         restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L));
         restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 0L));
         // let the store1 be restored from 0 to 10; store2 be restored from 5 (checkpointed) to 10
-        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(task3), CHECKPOINT_FILE_NAME));
+        final OffsetCheckpoint checkpoint
+            = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(task3), CHECKPOINT_FILE_NAME));
         checkpoint.write(Collections.singletonMap(partition2, 5L));
 
         for (long i = 0L; i < 10L; i++) {
-            restoreConsumer.addRecord(new ConsumerRecord<>(changelogName1, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes()));
-            restoreConsumer.addRecord(new ConsumerRecord<>(changelogName2, 1, i, ("K" + i).getBytes(), ("V" + i).getBytes()));
+            restoreConsumer.addRecord(new ConsumerRecord<>(
+                changelogName1,
+                1,
+                i,
+                ("K" + i).getBytes(),
+                ("V" + i).getBytes()));
+            restoreConsumer.addRecord(new ConsumerRecord<>(
+                changelogName2,
+                1,
+                i,
+                ("K" + i).getBytes(),
+                ("V" + i).getBytes()));
         }
 
         thread.setState(StreamThread.State.STARTING);
@@ -1015,9 +1007,9 @@ public class StreamThreadTest {
         standbyTasks.put(task1, Collections.singleton(t1p1));
         standbyTasks.put(task3, Collections.singleton(t2p1));
 
-        thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks);
+        thread.taskManager().setAssignmentMetadata(Collections.emptyMap(), standbyTasks);
 
-        thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList());
+        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
 
         thread.runOnce();
 
@@ -1035,33 +1027,19 @@ public class StreamThreadTest {
     public void shouldPunctuateActiveTask() {
         final List<Long> punctuatedStreamTime = new ArrayList<>();
         final List<Long> punctuatedWallClockTime = new ArrayList<>();
-        final ProcessorSupplier<Object, Object> punctuateProcessor = new ProcessorSupplier<Object, Object>() {
+        final ProcessorSupplier<Object, Object> punctuateProcessor = () -> new Processor<Object, Object>() {
             @Override
-            public Processor<Object, Object> get() {
-                return new Processor<Object, Object>() {
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, new Punctuator() {
-                            @Override
-                            public void punctuate(final long timestamp) {
-                                punctuatedStreamTime.add(timestamp);
-                            }
-                        });
-                        context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
-                            @Override
-                            public void punctuate(final long timestamp) {
-                                punctuatedWallClockTime.add(timestamp);
-                            }
-                        });
-                    }
-
-                    @Override
-                    public void process(final Object key, final Object value) {}
-
-                    @Override
-                    public void close() {}
-                };
+            public void init(final ProcessorContext context) {
+                context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
+                context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, punctuatedWallClockTime::add);
             }
+
+            @Override
+            public void process(final Object key,
+                                final Object value) {}
+
+            @Override
+            public void close() {}
         };
 
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor);
@@ -1079,7 +1057,7 @@ public class StreamThreadTest {
         assignedPartitions.add(t1p1);
         activeTasks.put(task1, Collections.singleton(t1p1));
 
-        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
 
         clientSupplier.consumer.assign(assignedPartitions);
         clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
@@ -1092,7 +1070,17 @@ public class StreamThreadTest {
 
         mockTime.sleep(100L);
         for (long i = 0L; i < 10L; i++) {
-            clientSupplier.consumer.addRecord(new ConsumerRecord<>(topic1, 1, i, i * 100L, TimestampType.CREATE_TIME, ConsumerRecord.NULL_CHECKSUM, ("K" + i).getBytes().length, ("V" + i).getBytes().length, ("K" + i).getBytes(), ("V" + i).getBytes()));
+            clientSupplier.consumer.addRecord(new ConsumerRecord<>(
+                topic1,
+                1,
+                i,
+                i * 100L,
+                TimestampType.CREATE_TIME,
+                ConsumerRecord.NULL_CHECKSUM,
+                ("K" + i).getBytes().length,
+                ("V" + i).getBytes().length,
+                ("K" + i).getBytes(),
+                ("V" + i).getBytes()));
         }
 
         thread.runOnce();
@@ -1126,7 +1114,7 @@ public class StreamThreadTest {
     @Test
     public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() {
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
-            .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count-one"));
+            .groupByKey().count(Materialized.as("count-one"));
         internalStreamsBuilder.buildAndOptimizeTopology();
 
         final StreamThread thread = createStreamThread(clientId, config, false);
@@ -1176,7 +1164,7 @@ public class StreamThreadTest {
     @Test
     public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws Exception {
         internalStreamsBuilder.stream(Collections.singleton("topic"), consumed)
-            .groupByKey().count(Materialized.<Object, Long, KeyValueStore<Bytes, byte[]>>as("count"));
+            .groupByKey().count(Materialized.as("count"));
         internalStreamsBuilder.buildAndOptimizeTopology();
 
         final StreamThread thread = createStreamThread("clientId", config, false);
@@ -1188,7 +1176,7 @@ public class StreamThreadTest {
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
         activeTasks.put(new TaskId(0, 0), topicPartitionSet);
-        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+        thread.taskManager().setAssignmentMetadata(activeTasks, Collections.emptyMap());
 
         mockConsumer.updatePartitions(
             "topic",
@@ -1222,32 +1210,28 @@ public class StreamThreadTest {
         mockRestoreConsumer.updateBeginningOffsets(Collections.singletonMap(changelogPartition, 0L));
         mockRestoreConsumer.updateEndOffsets(Collections.singletonMap(changelogPartition, 2L));
 
-        mockConsumer.schedulePollTask(new Runnable() {
-            @Override
-            public void run() {
-                thread.setState(StreamThread.State.PARTITIONS_REVOKED);
-                thread.rebalanceListener.onPartitionsAssigned(topicPartitionSet);
-            }
+        mockConsumer.schedulePollTask(() -> {
+            thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+            thread.rebalanceListener.onPartitionsAssigned(topicPartitionSet);
         });
 
         try {
             thread.start();
 
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return mockRestoreConsumer.assignment().size() == 1;
-                }
-            }, "Never restore first record");
+            TestUtils.waitForCondition(
+                () -> mockRestoreConsumer.assignment().size() == 1,
+                "Never restore first record");
 
-            mockRestoreConsumer.addRecord(new ConsumerRecord<>("stream-thread-test-count-changelog", 0, 0L, "K1".getBytes(), "V1".getBytes()));
+            mockRestoreConsumer.addRecord(new ConsumerRecord<>(
+                "stream-thread-test-count-changelog",
+                0,
+                0L,
+                "K1".getBytes(),
+                "V1".getBytes()));
 
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
-                    return mockRestoreConsumer.position(changelogPartition) == 1L;
-                }
-            }, "Never restore first record");
+            TestUtils.waitForCondition(
+                () -> mockRestoreConsumer.position(changelogPartition) == 1L,
+                "Never restore first record");
 
             mockRestoreConsumer.setException(new InvalidOffsetException("Try Again!") {
                 @Override
@@ -1256,16 +1240,25 @@ public class StreamThreadTest {
                 }
             });
 
-            mockRestoreConsumer.addRecord(new ConsumerRecord<>("stream-thread-test-count-changelog", 0, 0L, "K1".getBytes(), "V1".getBytes()));
-            mockRestoreConsumer.addRecord(new ConsumerRecord<>("stream-thread-test-count-changelog", 0, 1L, "K2".getBytes(), "V2".getBytes()));
+            mockRestoreConsumer.addRecord(new ConsumerRecord<>(
+                "stream-thread-test-count-changelog",
+                0,
+                0L,
+                "K1".getBytes(),
+                "V1".getBytes()));
+            mockRestoreConsumer.addRecord(new ConsumerRecord<>(
+                "stream-thread-test-count-changelog",
+                0,
+                1L,
+                "K2".getBytes(),
+                "V2".getBytes()));
 
-            TestUtils.waitForCondition(new TestCondition() {
-                @Override
-                public boolean conditionMet() {
+            TestUtils.waitForCondition(
+                () -> {
                     mockRestoreConsumer.assign(changelogPartitionSet);
                     return mockRestoreConsumer.position(changelogPartition) == 2L;
-                }
-            }, "Never finished restore");
+                },
+                "Never finished restore");
         } finally {
             thread.shutdown();
             thread.join(10000);
@@ -1279,7 +1272,9 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
 
         final Properties config = configProps(false);
-        config.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
+        config.setProperty(
+            StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
+            LogAndContinueExceptionHandler.class.getName());
         config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
         final StreamThread thread = createStreamThread(clientId, new StreamsConfig(config), false);
 
@@ -1291,7 +1286,7 @@ public class StreamThreadTest {
             Collections.singletonMap(
                 new TaskId(0, t1p1.partition()),
                 assignedPartitions),
-            Collections.<TaskId, Set<TopicPartition>>emptyMap());
+            Collections.emptyMap());
 
         final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
         mockConsumer.assign(Collections.singleton(t1p1));
@@ -1299,14 +1294,39 @@ public class StreamThreadTest {
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
         thread.runOnce();
 
-        final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
-        final MetricName skippedRateMetric = metrics.metricName("skipped-records-rate", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
+        final MetricName skippedTotalMetric = metrics.metricName(
+            "skipped-records-total",
+            "stream-metrics",
+            Collections.singletonMap("client-id", thread.getName()));
+        final MetricName skippedRateMetric = metrics.metricName(
+            "skipped-records-rate",
+            "stream-metrics",
+            Collections.singletonMap("client-id", thread.getName()));
         assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
         assertEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
 
         long offset = -1;
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], "I am not an integer.".getBytes()));
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], "I am not an integer.".getBytes()));
+        mockConsumer.addRecord(new ConsumerRecord<>(
+            t1p1.topic(),
+            t1p1.partition(),
+            ++offset, -1,
+            TimestampType.CREATE_TIME,
+            ConsumerRecord.NULL_CHECKSUM,
+            -1,
+            -1,
+            new byte[0],
+            "I am not an integer.".getBytes()));
+        mockConsumer.addRecord(new ConsumerRecord<>(
+            t1p1.topic(),
+            t1p1.partition(),
+            ++offset,
+            -1,
+            TimestampType.CREATE_TIME,
+            ConsumerRecord.NULL_CHECKSUM,
+            -1,
+            -1,
+            new byte[0],
+            "I am not an integer.".getBytes()));
         thread.runOnce();
         assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
         assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
@@ -1324,7 +1344,9 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
 
         final Properties config = configProps(false);
-        config.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, LogAndSkipOnInvalidTimestamp.class.getName());
+        config.setProperty(
+            StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+            LogAndSkipOnInvalidTimestamp.class.getName());
         final StreamThread thread = createStreamThread(clientId, new StreamsConfig(config), false);
 
         thread.setState(StreamThread.State.STARTING);
@@ -1335,7 +1357,7 @@ public class StreamThreadTest {
             Collections.singletonMap(
                 new TaskId(0, t1p1.partition()),
                 assignedPartitions),
-            Collections.<TaskId, Set<TopicPartition>>emptyMap());
+            Collections.emptyMap());
 
         final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
         mockConsumer.assign(Collections.singleton(t1p1));
@@ -1343,28 +1365,34 @@ public class StreamThreadTest {
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
         thread.runOnce();
 
-        final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
-        final MetricName skippedRateMetric = metrics.metricName("skipped-records-rate", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
+        final MetricName skippedTotalMetric = metrics.metricName(
+            "skipped-records-total",
+            "stream-metrics",
+            Collections.singletonMap("client-id", thread.getName()));
+        final MetricName skippedRateMetric = metrics.metricName(
+            "skipped-records-rate",
+            "stream-metrics",
+            Collections.singletonMap("client-id", thread.getName()));
         assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
         assertEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
 
         long offset = -1;
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        addRecord(mockConsumer, ++offset);
+        addRecord(mockConsumer, ++offset);
         thread.runOnce();
         assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
         assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
 
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        addRecord(mockConsumer, ++offset);
+        addRecord(mockConsumer, ++offset);
+        addRecord(mockConsumer, ++offset);
+        addRecord(mockConsumer, ++offset);
         thread.runOnce();
         assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
         assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
 
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
-        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
+        addRecord(mockConsumer, ++offset, 1L);
+        addRecord(mockConsumer, ++offset, 1L);
         thread.runOnce();
         assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
         assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
@@ -1417,7 +1445,8 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
         final StreamThread thread = new StreamThread(
                 mockTime,
                 config,
@@ -1432,18 +1461,13 @@ public class StreamThreadTest {
                 new LogContext(""),
                 new AtomicInteger()
                 );
-        final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<String, String>());
+        final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>());
         final Metric testMetric = new KafkaMetric(
-                new Object(),
-                testMetricName,
-                new Measurable() {
-                    @Override
-                    public double measure(final MetricConfig config, final long now) {
-                        return 0;
-                    }
-                },
-                null,
-                new MockTime());
+            new Object(),
+            testMetricName,
+            (Measurable) (config, now) -> 0,
+            null,
+            new MockTime());
         producer.setMockMetrics(testMetricName, testMetric);
         final Map<MetricName, Metric> producerMetrics = thread.producerMetrics();
         assertEquals(testMetricName, producerMetrics.get(testMetricName).metricName());
@@ -1461,7 +1485,8 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics
+            = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
         final StreamThread thread = new StreamThread(
                 mockTime,
                 config,
@@ -1476,18 +1501,13 @@ public class StreamThreadTest {
                 new LogContext(""),
                 new AtomicInteger()
                 );
-        final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<String, String>());
+        final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>());
         final Metric testMetric = new KafkaMetric(
-                new Object(),
-                testMetricName,
-                new Measurable() {
-                    @Override
-                    public double measure(final MetricConfig config, final long now) {
-                        return 0;
-                    }
-                },
-                null,
-                new MockTime());
+            new Object(),
+            testMetricName,
+            (Measurable) (config, now) -> 0,
+            null,
+            new MockTime());
 
 
         EasyMock.expect(taskManager.getAdminClient()).andReturn(adminClient);
@@ -1498,4 +1518,25 @@ public class StreamThreadTest {
         final Map<MetricName, Metric> adminClientMetrics = thread.adminClientMetrics();
         assertEquals(testMetricName, adminClientMetrics.get(testMetricName).metricName());
     }
+
+    private void addRecord(final MockConsumer<byte[], byte[]> mockConsumer,
+                           final long offset) {
+        addRecord(mockConsumer, offset, -1L);
+    }
+
+    private void addRecord(final MockConsumer<byte[], byte[]> mockConsumer,
+                           final long offset,
+                           final long timestamp) {
+        mockConsumer.addRecord(new ConsumerRecord<>(
+            t1p1.topic(),
+            t1p1.partition(),
+            offset,
+            timestamp,
+            TimestampType.CREATE_TIME,
+            ConsumerRecord.NULL_CHECKSUM,
+            -1,
+            -1,
+            new byte[0],
+            new byte[0]));
+    }
 }


Mime
View raw message