kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [3/5] kafka git commit: KAFKA-5531; throw concrete exceptions in streams tests
Date Mon, 11 Sep 2017 08:42:20 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 8aedf36..fbf45b3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -312,7 +312,7 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
-    public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception {
+    public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws IOException {
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             new TaskId(0, 1),
             noPartitions,
@@ -326,7 +326,7 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
-    public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws Exception {
+    public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws IOException {
         final Map<TopicPartition, Long> offsets = Collections.singletonMap(persistentStorePartition, 99L);
         checkpoint.write(offsets);
 
@@ -346,7 +346,7 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
-    public void shouldWriteCheckpointForPersistentLogEnabledStore() throws Exception {
+    public void shouldWriteCheckpointForPersistentLogEnabledStore() throws IOException {
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             taskId,
             noPartitions,
@@ -363,7 +363,7 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
-    public void shouldWriteCheckpointForStandbyReplica() throws Exception {
+    public void shouldWriteCheckpointForStandbyReplica() throws IOException {
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             taskId,
             noPartitions,
@@ -391,7 +391,7 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
-    public void shouldNotWriteCheckpointForNonPersistent() throws Exception {
+    public void shouldNotWriteCheckpointForNonPersistent() throws IOException {
         final TopicPartition topicPartition = new TopicPartition(nonPersistentStoreTopicName, 1);
 
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
@@ -411,7 +411,7 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
-    public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws Exception {
+    public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws IOException {
         final ProcessorStateManager stateMgr = new ProcessorStateManager(
             taskId,
             noPartitions,
@@ -431,7 +431,7 @@ public class ProcessorStateManagerTest {
 
 
     @Test
-    public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() throws Exception {
+    public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() throws IOException {
         final ProcessorStateManager stateManager = new ProcessorStateManager(
             taskId,
             noPartitions,
@@ -450,7 +450,7 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
-    public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered() throws Exception {
+    public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered() throws IOException {
         final ProcessorStateManager stateManager = new ProcessorStateManager(
             taskId,
             noPartitions,
@@ -471,7 +471,7 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
-    public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws Exception {
+    public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws IOException {
 
         final ProcessorStateManager stateManager = new ProcessorStateManager(
             taskId,
@@ -499,7 +499,7 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
-    public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws Exception {
+    public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws IOException {
         checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
         assertTrue(checkpointFile.exists());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 1ea09cd..da2e5dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -204,7 +204,7 @@ public class ProcessorTopologyTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldDriveGlobalStore() throws Exception {
+    public void shouldDriveGlobalStore() {
         final StateStoreSupplier storeSupplier = Stores.create("my-store")
                 .withStringKeys().withStringValues().inMemory().disableLogging().build();
         final String global = "global";
@@ -271,7 +271,7 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void shouldCreateStringWithSourceAndTopics() throws Exception {
+    public void shouldCreateStringWithSourceAndTopics() {
         builder.addSource("source", "topic1", "topic2");
         final ProcessorTopology topology = builder.build(null);
         final String result = topology.toString();
@@ -279,7 +279,7 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void shouldCreateStringWithMultipleSourcesAndTopics() throws Exception {
+    public void shouldCreateStringWithMultipleSourcesAndTopics() {
         builder.addSource("source", "topic1", "topic2");
         builder.addSource("source2", "t", "t1", "t2");
         final ProcessorTopology topology = builder.build(null);
@@ -289,7 +289,7 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void shouldCreateStringWithProcessors() throws Exception {
+    public void shouldCreateStringWithProcessors() {
         builder.addSource("source", "t")
                 .addProcessor("processor", mockProcessorSupplier, "source")
                 .addProcessor("other", mockProcessorSupplier, "source");
@@ -301,7 +301,7 @@ public class ProcessorTopologyTest {
     }
 
     @Test
-    public void shouldRecursivelyPrintChildren() throws Exception {
+    public void shouldRecursivelyPrintChildren() {
         builder.addSource("source", "t")
                 .addProcessor("processor", mockProcessorSupplier, "source")
                 .addProcessor("child-one", mockProcessorSupplier, "processor")

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 1aa7f38..09f46e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -123,7 +123,7 @@ public class RecordCollectorTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
+    public void shouldRetryWhenTimeoutExceptionOccursOnSend() {
         final AtomicInteger attempt = new AtomicInteger(0);
         final RecordCollectorImpl collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@@ -144,7 +144,7 @@ public class RecordCollectorTest {
 
     @SuppressWarnings("unchecked")
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception {
+    public void shouldThrowStreamsExceptionAfterMaxAttempts() {
         final RecordCollector collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                     @Override
@@ -160,7 +160,7 @@ public class RecordCollectorTest {
 
     @SuppressWarnings("unchecked")
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() throws Exception {
+    public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() {
         final RecordCollector collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                     @Override
@@ -176,7 +176,7 @@ public class RecordCollectorTest {
 
     @SuppressWarnings("unchecked")
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionOnFlushIfASendFailed() throws Exception {
+    public void shouldThrowStreamsExceptionOnFlushIfASendFailed() {
         final RecordCollector collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                     @Override
@@ -192,7 +192,7 @@ public class RecordCollectorTest {
 
     @SuppressWarnings("unchecked")
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception {
+    public void shouldThrowStreamsExceptionOnCloseIfASendFailed() {
         final RecordCollector collector = new RecordCollectorImpl(
                 new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 6c45fd8..c6e9eac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -147,7 +147,7 @@ public class RecordQueueTest {
     }
 
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() throws Exception {
+    public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
         final byte[] key = Serdes.Long().serializer().serialize("foo", 1L);
         final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
                 new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, recordValue));
@@ -156,7 +156,7 @@ public class RecordQueueTest {
     }
 
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionWhenValueDeserializationFails() throws Exception {
+    public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
         final byte[] value = Serdes.Long().serializer().serialize("foo", 1L);
         final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList(
                 new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, value));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
index 9ba8308..821dbe9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
@@ -41,21 +41,21 @@ public class SourceNodeRecordDeserializerTest {
 
 
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfKeyFailsToDeserialize() throws Exception {
+    public void shouldThrowStreamsExceptionIfKeyFailsToDeserialize() {
         final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
                 new TheSourceNode(true, false), null);
         recordDeserializer.deserialize(rawRecord);
     }
 
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfKeyValueFailsToDeserialize() throws Exception {
+    public void shouldThrowStreamsExceptionIfKeyValueFailsToDeserialize() {
         final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
                 new TheSourceNode(false, true), null);
         recordDeserializer.deserialize(rawRecord);
     }
 
     @Test
-    public void shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions() throws Exception {
+    public void shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions() {
         final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer(
                 new TheSourceNode(false, false, "key", "value"), null);
         final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(rawRecord);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 881da2a..fd63bd6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -119,7 +119,7 @@ public class StandbyTaskTest {
     private File baseDir;
     private StateDirectory stateDirectory;
 
-    private StreamsConfig createConfig(final File baseDir) throws Exception {
+    private StreamsConfig createConfig(final File baseDir) throws IOException {
         return new StreamsConfig(new Properties() {
             {
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
@@ -162,7 +162,7 @@ public class StandbyTaskTest {
     }
 
     @Test
-    public void testStorePartitions() throws Exception {
+    public void testStorePartitions() throws IOException {
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
         task.initialize();
@@ -171,8 +171,8 @@ public class StandbyTaskTest {
     }
 
     @SuppressWarnings("unchecked")
-    @Test(expected = Exception.class)
-    public void testUpdateNonPersistentStore() throws Exception {
+    @Test(expected = ProcessorStateException.class)
+    public void testUpdateNonPersistentStore() throws IOException {
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
 
@@ -185,7 +185,7 @@ public class StandbyTaskTest {
     }
 
     @Test
-    public void testUpdate() throws Exception {
+    public void testUpdate() throws IOException {
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
         task.initialize();
@@ -231,7 +231,7 @@ public class StandbyTaskTest {
     }
 
     @Test
-    public void testUpdateKTable() throws Exception {
+    public void testUpdateKTable() throws IOException {
         consumer.assign(Utils.mkList(ktable));
         Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
         committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(0L));
@@ -323,7 +323,7 @@ public class StandbyTaskTest {
     }
 
     @Test
-    public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws Exception {
+    public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws IOException {
         final String changelogName = "test-application-my-store-changelog";
         final List<TopicPartition> partitions = Utils.mkList(new TopicPartition(changelogName, 0));
         consumer.assign(partitions);
@@ -345,7 +345,7 @@ public class StandbyTaskTest {
     }
 
     @Test
-    public void shouldCheckpointStoreOffsetsOnCommit() throws Exception {
+    public void shouldCheckpointStoreOffsetsOnCommit() throws IOException {
         consumer.assign(Utils.mkList(ktable));
         final Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
         committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(100L));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
index 7a860e3..8024cd9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -45,7 +46,7 @@ public class StateConsumerTest {
     private StateMaintainerStub stateMaintainer;
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         partitionOffsets.put(topicOne, 20L);
         partitionOffsets.put(topicTwo, 30L);
         stateMaintainer = new StateMaintainerStub(partitionOffsets);
@@ -53,20 +54,20 @@ public class StateConsumerTest {
     }
 
     @Test
-    public void shouldAssignPartitionsToConsumer() throws Exception {
+    public void shouldAssignPartitionsToConsumer() {
         stateConsumer.initialize();
         assertEquals(Utils.mkSet(topicOne, topicTwo), consumer.assignment());
     }
 
     @Test
-    public void shouldSeekToInitialOffsets() throws Exception {
+    public void shouldSeekToInitialOffsets() {
         stateConsumer.initialize();
         assertEquals(20L, consumer.position(topicOne));
         assertEquals(30L, consumer.position(topicTwo));
     }
 
     @Test
-    public void shouldUpdateStateWithReceivedRecordsForPartition() throws Exception {
+    public void shouldUpdateStateWithReceivedRecordsForPartition() {
         stateConsumer.initialize();
         consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 20L, new byte[0], new byte[0]));
         consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 21L, new byte[0], new byte[0]));
@@ -75,7 +76,7 @@ public class StateConsumerTest {
     }
 
     @Test
-    public void shouldUpdateStateWithReceivedRecordsForAllTopicPartition() throws Exception {
+    public void shouldUpdateStateWithReceivedRecordsForAllTopicPartition() {
         stateConsumer.initialize();
         consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 20L, new byte[0], new byte[0]));
         consumer.addRecord(new ConsumerRecord<>("topic-two", 1, 31L, new byte[0], new byte[0]));
@@ -86,7 +87,7 @@ public class StateConsumerTest {
     }
 
     @Test
-    public void shouldFlushStoreWhenFlushIntervalHasLapsed() throws Exception {
+    public void shouldFlushStoreWhenFlushIntervalHasLapsed() {
         stateConsumer.initialize();
         consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 20L, new byte[0], new byte[0]));
         time.sleep(FLUSH_INTERVAL);
@@ -96,7 +97,7 @@ public class StateConsumerTest {
     }
 
     @Test
-    public void shouldNotFlushOffsetsWhenFlushIntervalHasNotLapsed() throws Exception {
+    public void shouldNotFlushOffsetsWhenFlushIntervalHasNotLapsed() {
         stateConsumer.initialize();
         consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 20L, new byte[0], new byte[0]));
         time.sleep(FLUSH_INTERVAL / 2);
@@ -105,7 +106,7 @@ public class StateConsumerTest {
     }
 
     @Test
-    public void shouldNotFlushWhenFlushIntervalIsZero() throws Exception {
+    public void shouldNotFlushWhenFlushIntervalIsZero() {
         stateConsumer = new GlobalStreamThread.StateConsumer("test", consumer, stateMaintainer, time, 10L, -1);
         stateConsumer.initialize();
         time.sleep(100);
@@ -114,13 +115,13 @@ public class StateConsumerTest {
     }
 
     @Test
-    public void shouldCloseConsumer() throws Exception {
+    public void shouldCloseConsumer() throws IOException {
         stateConsumer.close();
         assertTrue(consumer.closed());
     }
 
     @Test
-    public void shouldCloseStateMaintainer() throws Exception {
+    public void shouldCloseStateMaintainer() throws IOException {
         stateConsumer.close();
         assertTrue(stateMaintainer.closed);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 2b28090..886188d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -63,7 +63,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldCreateBaseDirectory() throws Exception {
+    public void shouldCreateBaseDirectory() {
         assertTrue(stateDir.exists());
         assertTrue(stateDir.isDirectory());
         assertTrue(appDir.exists());
@@ -71,7 +71,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldCreateTaskStateDirectory() throws Exception {
+    public void shouldCreateTaskStateDirectory() {
         final TaskId taskId = new TaskId(0, 0);
         final File taskDirectory = directory.directoryForTask(taskId);
         assertTrue(taskDirectory.exists());
@@ -79,7 +79,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldLockTaskStateDirectory() throws Exception {
+    public void shouldLockTaskStateDirectory() throws IOException {
         final TaskId taskId = new TaskId(0, 0);
         final File taskDirectory = directory.directoryForTask(taskId);
 
@@ -100,7 +100,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldBeTrueIfAlreadyHoldsLock() throws Exception {
+    public void shouldBeTrueIfAlreadyHoldsLock() throws IOException {
         final TaskId taskId = new TaskId(0, 0);
         directory.directoryForTask(taskId);
         directory.lock(taskId, 0);
@@ -112,7 +112,7 @@ public class StateDirectoryTest {
     }
 
     @Test(expected = ProcessorStateException.class)
-    public void shouldThrowProcessorStateException() throws Exception {
+    public void shouldThrowProcessorStateException() throws IOException {
         final TaskId taskId = new TaskId(0, 0);
 
         Utils.delete(stateDir);
@@ -120,7 +120,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldNotLockDeletedDirectory() throws Exception {
+    public void shouldNotLockDeletedDirectory() throws IOException {
         final TaskId taskId = new TaskId(0, 0);
 
         Utils.delete(stateDir);
@@ -128,7 +128,7 @@ public class StateDirectoryTest {
     }
     
     @Test
-    public void shouldLockMulitpleTaskDirectories() throws Exception {
+    public void shouldLockMulitpleTaskDirectories() throws IOException {
         final TaskId taskId = new TaskId(0, 0);
         final File task1Dir = directory.directoryForTask(taskId);
         final TaskId taskId2 = new TaskId(1, 0);
@@ -159,7 +159,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldReleaseTaskStateDirectoryLock() throws Exception {
+    public void shouldReleaseTaskStateDirectoryLock() throws IOException {
         final TaskId taskId = new TaskId(0, 0);
         final File taskDirectory = directory.directoryForTask(taskId);
 
@@ -177,7 +177,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws Exception {
+    public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws IOException {
         final TaskId task0 = new TaskId(0, 0);
         final TaskId task1 = new TaskId(1, 0);
         try {
@@ -198,7 +198,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() throws Exception {
+    public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() {
         final File dir = directory.directoryForTask(new TaskId(2, 0));
         final int cleanupDelayMs = 60000;
         directory.cleanRemovedTasks(cleanupDelayMs);
@@ -210,14 +210,14 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldNotRemoveNonTaskDirectoriesAndFiles() throws Exception {
+    public void shouldNotRemoveNonTaskDirectoriesAndFiles() {
         final File otherDir = TestUtils.tempDirectory(stateDir.toPath(), "foo");
         directory.cleanRemovedTasks(0);
         assertTrue(otherDir.exists());
     }
 
     @Test
-    public void shouldListAllTaskDirectories() throws Exception {
+    public void shouldListAllTaskDirectories() {
         TestUtils.tempDirectory(stateDir.toPath(), "foo");
         final File taskDir1 = directory.directoryForTask(new TaskId(0, 0));
         final File taskDir2 = directory.directoryForTask(new TaskId(0, 1));
@@ -229,7 +229,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldCreateDirectoriesIfParentDoesntExist() throws Exception {
+    public void shouldCreateDirectoriesIfParentDoesntExist() {
         final File tempDir = TestUtils.tempDirectory();
         final File stateDir = new File(new File(tempDir, "foo"), "state-dir");
         final StateDirectory stateDirectory = new StateDirectory(applicationId, stateDir.getPath(), time);
@@ -239,7 +239,7 @@ public class StateDirectoryTest {
     }
 
     @Test(expected = OverlappingFileLockException.class)
-    public void shouldLockGlobalStateDirectory() throws Exception {
+    public void shouldLockGlobalStateDirectory() throws IOException {
         directory.lockGlobalState(1);
 
         try (
@@ -255,7 +255,7 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldUnlockGlobalStateDirectory() throws Exception {
+    public void shouldUnlockGlobalStateDirectory() throws IOException {
         directory.lockGlobalState(1);
         directory.unlockGlobalState();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index 8abce3b..c782790 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -44,28 +44,28 @@ public class StateRestorerTest {
     }
 
     @Test
-    public void shouldCallRestoreOnRestoreCallback() throws Exception {
+    public void shouldCallRestoreOnRestoreCallback() {
         restorer.restore(Collections.singletonList(KeyValue.pair(new byte[0], new byte[0])));
         assertThat(callback.restored.size(), equalTo(1));
     }
 
     @Test
-    public void shouldBeCompletedIfRecordOffsetGreaterThanEndOffset() throws Exception {
+    public void shouldBeCompletedIfRecordOffsetGreaterThanEndOffset() {
         assertTrue(restorer.hasCompleted(11, 10));
     }
 
     @Test
-    public void shouldBeCompletedIfRecordOffsetGreaterThanOffsetLimit() throws Exception {
+    public void shouldBeCompletedIfRecordOffsetGreaterThanOffsetLimit() {
         assertTrue(restorer.hasCompleted(51, 100));
     }
 
     @Test
-    public void shouldBeCompletedIfEndOffsetAndRecordOffsetAreZero() throws Exception {
+    public void shouldBeCompletedIfEndOffsetAndRecordOffsetAreZero() {
         assertTrue(restorer.hasCompleted(0, 0));
     }
 
     @Test
-    public void shouldBeCompletedIfOffsetAndOffsetLimitAreZero() throws Exception {
+    public void shouldBeCompletedIfOffsetAndOffsetLimitAreZero() {
         final StateRestorer
             restorer =
             new StateRestorer(new TopicPartition("topic", 1), compositeRestoreListener, null, 0, true,
@@ -74,7 +74,7 @@ public class StateRestorerTest {
     }
 
     @Test
-    public void shouldSetRestoredOffsetToMinOfLimitAndOffset() throws Exception {
+    public void shouldSetRestoredOffsetToMinOfLimitAndOffset() {
         restorer.setRestoredOffset(20);
         assertThat(restorer.restoredOffset(), equalTo(20L));
         restorer.setRestoredOffset(100);
@@ -82,7 +82,7 @@ public class StateRestorerTest {
     }
 
     @Test
-    public void shouldSetStartingOffsetToMinOfLimitAndOffset() throws Exception {
+    public void shouldSetStartingOffsetToMinOfLimitAndOffset() {
         restorer.setStartingOffset(20);
         assertThat(restorer.startingOffset(), equalTo(20L));
         restorer.setRestoredOffset(100);
@@ -90,7 +90,7 @@ public class StateRestorerTest {
     }
 
     @Test
-    public void shouldReturnCorrectNumRestoredRecords() throws Exception {
+    public void shouldReturnCorrectNumRestoredRecords() {
         restorer.setStartingOffset(20);
         restorer.setRestoredOffset(40);
         assertThat(restorer.restoredNumRecords(), equalTo(20L));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index a480ec1..c574bbc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -60,7 +60,7 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldRequestTopicsAndHandleTimeoutException() throws Exception {
+    public void shouldRequestTopicsAndHandleTimeoutException() {
         final AtomicBoolean functionCalled = new AtomicBoolean(false);
         final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
             @Override
@@ -78,7 +78,7 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldThrowExceptionIfConsumerHasCurrentSubscription() throws Exception {
+    public void shouldThrowExceptionIfConsumerHasCurrentSubscription() {
         consumer.subscribe(Collections.singleton("sometopic"));
         try {
             changelogReader.restore();
@@ -89,7 +89,7 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() throws Exception {
+    public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
@@ -99,7 +99,7 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldRestoreMessagesFromCheckpoint() throws Exception {
+    public void shouldRestoreMessagesFromCheckpoint() {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, 5L, Long.MAX_VALUE, true,
@@ -110,7 +110,7 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldClearAssignmentAtEndOfRestore() throws Exception {
+    public void shouldClearAssignmentAtEndOfRestore() {
         final int messages = 1;
         setupConsumer(messages, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
@@ -121,7 +121,7 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldRestoreToLimitWhenSupplied() throws Exception {
+    public void shouldRestoreToLimitWhenSupplied() {
         setupConsumer(10, topicPartition);
         final StateRestorer restorer = new StateRestorer(topicPartition, restoreListener, null, 3, true,
                                                          "storeName");
@@ -132,7 +132,7 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldRestoreMultipleStores() throws Exception {
+    public void shouldRestoreMultipleStores() {
         final TopicPartition one = new TopicPartition("one", 0);
         final TopicPartition two = new TopicPartition("two", 0);
         final MockRestoreCallback callbackOne = new MockRestoreCallback();
@@ -206,7 +206,7 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldNotRestoreAnythingWhenPartitionIsEmpty() throws Exception {
+    public void shouldNotRestoreAnythingWhenPartitionIsEmpty() {
         final StateRestorer
             restorer =
             new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName");
@@ -219,7 +219,7 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() throws Exception {
+    public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() {
         final Long endOffset = 10L;
         setupConsumer(endOffset, topicPartition);
         final StateRestorer
@@ -234,7 +234,7 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldReturnRestoredOffsetsForPersistentStores() throws Exception {
+    public void shouldReturnRestoredOffsetsForPersistentStores() {
         setupConsumer(10, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true,
                                                    "storeName"));
@@ -244,7 +244,7 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldNotReturnRestoredOffsetsForNonPersistentStore() throws Exception {
+    public void shouldNotReturnRestoredOffsetsForNonPersistentStore() {
         setupConsumer(10, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false,
                                                    "storeName"));
@@ -254,7 +254,7 @@ public class StoreChangelogReaderTest {
     }
 
     @Test
-    public void shouldIgnoreNullKeysWhenRestoring() throws Exception {
+    public void shouldIgnoreNullKeysWhenRestoring() {
         assignPartition(3, topicPartition);
         final byte[] bytes = new byte[0];
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 0, bytes, bytes));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index e638a48..d9a215c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -708,7 +708,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception {
+    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
         final String myEndPoint = "localhost:j87yhk";
         final String applicationId = "application-id";
         builder.setApplicationId(applicationId);
@@ -760,7 +760,7 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test
-    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception {
+    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() {
         final Cluster cluster = partitionAssignor.clusterMetadata();
         assertNotNull(cluster);
     }
@@ -968,12 +968,12 @@ public class StreamPartitionAssignorTest {
     }
 
     @Test(expected = KafkaException.class)
-    public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() throws Exception {
+    public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() {
         partitionAssignor.configure(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1));
     }
 
     @Test(expected = KafkaException.class)
-    public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProviderInstance() throws Exception {
+    public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProviderInstance() {
         final Map<String, Object> config = new HashMap<>();
         config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         config.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, "i am not a stream thread");

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a9d43ba..2b719d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -137,7 +137,7 @@ public class StreamTaskTest {
         }
     };
 
-    private StreamsConfig createConfig(final boolean enableEoS) throws Exception {
+    private StreamsConfig createConfig(final boolean enableEoS) throws IOException {
         return new StreamsConfig(new Properties() {
             {
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-task-test");
@@ -153,7 +153,7 @@ public class StreamTaskTest {
     }
 
     @Before
-    public void setup() throws Exception {
+    public void setup() throws IOException {
         consumer.assign(Arrays.asList(partition1, partition2));
         source1.addChild(processorStreamTime);
         source2.addChild(processorStreamTime);
@@ -180,7 +180,7 @@ public class StreamTaskTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testProcessOrder() throws Exception {
+    public void testProcessOrder() {
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -236,7 +236,7 @@ public class StreamTaskTest {
 
 
     @Test
-    public void testMetrics() throws Exception {
+    public void testMetrics() {
         final String name = task.id().toString();
         final Map<String, String> metricTags = new LinkedHashMap<>();
         metricTags.put("task-id", name);
@@ -252,7 +252,7 @@ public class StreamTaskTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testPauseResume() throws Exception {
+    public void testPauseResume() {
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
@@ -305,7 +305,7 @@ public class StreamTaskTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testMaybePunctuateStreamTime() throws Exception {
+    public void testMaybePunctuateStreamTime() {
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -368,7 +368,7 @@ public class StreamTaskTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testCancelPunctuateStreamTime() throws Exception {
+    public void testCancelPunctuateStreamTime() {
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -397,7 +397,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldPunctuateSystemTimeWhenIntervalElapsed() throws Exception {
+    public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
         long now = time.milliseconds();
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
@@ -409,7 +409,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() throws Exception {
+    public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
         long now = time.milliseconds();
         assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate
         time.sleep(9);
@@ -418,7 +418,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void testCancelPunctuateSystemTime() throws Exception {
+    public void testCancelPunctuateSystemTime() {
         long now = time.milliseconds();
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
@@ -430,7 +430,7 @@ public class StreamTaskTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() throws Exception {
+    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
         final MockSourceNode processorNode = new MockSourceNode(topic1, intDeserializer, intDeserializer) {
 
             @Override
@@ -476,7 +476,7 @@ public class StreamTaskTest {
 
     @SuppressWarnings(value = {"unchecked", "deprecation"})
     @Test
-    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingDeprecated() throws Exception {
+    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingDeprecated() {
         final Processor processor = new AbstractProcessor() {
             @Override
             public void init(final ProcessorContext context) {
@@ -512,7 +512,7 @@ public class StreamTaskTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() throws Exception {
+    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() {
         final Processor processor = new AbstractProcessor() {
             @Override
             public void init(final ProcessorContext context) {
@@ -544,7 +544,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldFlushRecordCollectorOnFlushState() throws Exception {
+    public void shouldFlushRecordCollectorOnFlushState() {
         final AtomicBoolean flushed = new AtomicBoolean(false);
         final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
         final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer,
@@ -566,7 +566,7 @@ public class StreamTaskTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldCheckpointOffsetsOnCommit() throws Exception {
+    public void shouldCheckpointOffsetsOnCommit() throws IOException {
         final String storeName = "test";
         final String changelogTopic = ProcessorStateManager.storeChangelogTopic("appId", storeName);
         final InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore(storeName, null, null) {
@@ -628,7 +628,7 @@ public class StreamTaskTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() throws Exception {
+    public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
         final Map<String, Object> properties = config.originals();
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
         final StreamsConfig testConfig = new StreamsConfig(properties);
@@ -693,7 +693,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() throws Exception {
+    public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
         ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processorStreamTime);
         try {
             task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator);
@@ -704,7 +704,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldCallPunctuateOnPassedInProcessorNode() throws Exception {
+    public void shouldCallPunctuateOnPassedInProcessorNode() {
         task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator);
         assertThat(punctuatedAt, equalTo(5L));
         task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator);
@@ -712,13 +712,13 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() throws Exception {
+    public void shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() {
         task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator);
         assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue());
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() throws Exception {
+    public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() {
         task.schedule(1, PunctuationType.STREAM_TIME, new Punctuator() {
             @Override
             public void punctuate(long timestamp) {
@@ -728,7 +728,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() throws Exception {
+    public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
         ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processorStreamTime);
         task.schedule(1, PunctuationType.STREAM_TIME, new Punctuator() {
             @Override
@@ -739,7 +739,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() throws Exception {
+    public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() {
         task.close(true, false);
         task = createTaskThatThrowsExceptionOnClose();
         task.initialize();
@@ -755,7 +755,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() throws Exception {
+    public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
@@ -765,7 +765,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() throws Exception {
+    public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             config, streamsMetrics, stateDirectory, null, time, producer);
@@ -775,7 +775,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() throws Exception {
+    public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
@@ -791,7 +791,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() throws Exception {
+    public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
@@ -802,7 +802,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() throws Exception {
+    public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             config, streamsMetrics, stateDirectory, null, time, producer);
@@ -818,7 +818,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldStartNewTransactionOnResumeIfEosEnabled() throws Exception {
+    public void shouldStartNewTransactionOnResumeIfEosEnabled() {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
@@ -833,7 +833,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldNotStartNewTransactionOnResumeIfEosDisabled() throws Exception {
+    public void shouldNotStartNewTransactionOnResumeIfEosDisabled() {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             config, streamsMetrics, stateDirectory, null, time, producer);
@@ -848,7 +848,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldStartNewTransactionOnCommitIfEosEnabled() throws Exception {
+    public void shouldStartNewTransactionOnCommitIfEosEnabled() {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
@@ -862,7 +862,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldNotStartNewTransactionOnCommitIfEosDisabled() throws Exception {
+    public void shouldNotStartNewTransactionOnCommitIfEosDisabled() {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             config, streamsMetrics, stateDirectory, null, time, producer);
@@ -876,7 +876,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() throws Exception {
+    public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             eosConfig, streamsMetrics, stateDirectory, null, time, producer);
@@ -898,7 +898,7 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() throws Exception {
+    public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() {
         final MockProducer producer = new MockProducer();
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
             config, streamsMetrics, stateDirectory, null, time, producer);
@@ -909,7 +909,7 @@ public class StreamTaskTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldCloseProducerOnCloseWhenEosEnabled() throws Exception {
+    public void shouldCloseProducerOnCloseWhenEosEnabled() {
         final MockProducer producer = new MockProducer();
 
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index a7923f8..fd9b19d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -90,7 +90,7 @@ public class StreamThreadTest {
     private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal<>();
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         processId = UUID.randomUUID();
 
         internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(internalStreamsBuilder);
@@ -129,7 +129,7 @@ public class StreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testPartitionAssignmentChangeForSingleGroup() throws Exception {
+    public void testPartitionAssignmentChangeForSingleGroup() {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, "topic1");
 
         final StreamThread thread = getStreamThread();
@@ -223,7 +223,7 @@ public class StreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testPartitionAssignmentChangeForMultipleGroups() throws Exception {
+    public void testPartitionAssignmentChangeForMultipleGroups() {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, "topic1");
         internalTopologyBuilder.addSource(null, "source2", null, null, null, "topic2");
         internalTopologyBuilder.addSource(null, "source3", null, null, null, "topic3");
@@ -744,7 +744,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception {
+    public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() {
         internalStreamsBuilder.stream(Collections.singleton("t1"), consumed).groupByKey().count("count-one");
         internalStreamsBuilder.stream(Collections.singleton("t2"), consumed).groupByKey().count("count-two");
 
@@ -810,7 +810,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws Exception {
+    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws InterruptedException {
         internalTopologyBuilder.addSource(null, "source", null, null, null, TOPIC);
         internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, "source");
 
@@ -871,7 +871,7 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() throws Exception {
+    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() {
         internalTopologyBuilder.addSource(null, "name", null, null, null, "topic");
         internalTopologyBuilder.addSink("out", "output", null, null, null);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index ccba038..b22c488 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -127,12 +127,12 @@ public class StreamsMetadataStateTest {
     }
 
     @Test
-    public void shouldNotThrowNPEWhenOnChangeNotCalled() throws Exception {
+    public void shouldNotThrowNPEWhenOnChangeNotCalled() {
         new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), hostOne).getAllMetadataForStore("store");
     }
 
     @Test
-    public void shouldGetAllStreamInstances() throws Exception {
+    public void shouldGetAllStreamInstances() {
         final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet(globalTable, "table-one", "table-two", "merged-table"),
                 Utils.mkSet(topic1P0, topic2P1, topic4P0));
         final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet(globalTable, "table-two", "table-one", "merged-table"),
@@ -148,7 +148,7 @@ public class StreamsMetadataStateTest {
     }
 
     @Test
-    public void shouldGetAllStreamsInstancesWithNoStores() throws Exception {
+    public void shouldGetAllStreamsInstancesWithNoStores() {
         builder.stream("topic-five").filter(new Predicate<Object, Object>() {
             @Override
             public boolean test(final Object key, final Object value) {
@@ -169,7 +169,7 @@ public class StreamsMetadataStateTest {
     }
 
     @Test
-    public void shouldGetInstancesForStoreName() throws Exception {
+    public void shouldGetInstancesForStoreName() {
         final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet(globalTable, "table-one", "table-two", "merged-table"),
                 Utils.mkSet(topic1P0, topic2P1, topic4P0));
         final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet(globalTable, "table-two", "table-one", "merged-table"),
@@ -181,18 +181,18 @@ public class StreamsMetadataStateTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore() throws Exception {
+    public void shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore() {
         discovery.getAllMetadataForStore(null);
     }
 
     @Test
-    public void shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist() throws Exception {
+    public void shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist() {
         final Collection<StreamsMetadata> actual = discovery.getAllMetadataForStore("not-a-store");
         assertTrue(actual.isEmpty());
     }
 
     @Test
-    public void shouldGetInstanceWithKey() throws Exception {
+    public void shouldGetInstanceWithKey() {
         final TopicPartition tp4 = new TopicPartition("topic-three", 1);
         hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4));
 
@@ -209,7 +209,7 @@ public class StreamsMetadataStateTest {
     }
 
     @Test
-    public void shouldGetInstanceWithKeyAndCustomPartitioner() throws Exception {
+    public void shouldGetInstanceWithKeyAndCustomPartitioner() {
         final TopicPartition tp4 = new TopicPartition("topic-three", 1);
         hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4));
 
@@ -223,14 +223,14 @@ public class StreamsMetadataStateTest {
     }
 
     @Test
-    public void shouldReturnNotAvailableWhenClusterIsEmpty() throws Exception {
+    public void shouldReturnNotAvailableWhenClusterIsEmpty() {
         discovery.onChange(Collections.<HostInfo, Set<TopicPartition>>emptyMap(), Cluster.empty());
         final StreamsMetadata result = discovery.getMetadataWithKey("table-one", "a", Serdes.String().serializer());
         assertEquals(StreamsMetadata.NOT_AVAILABLE, result);
     }
 
     @Test
-    public void shouldGetInstanceWithKeyWithMergedStreams() throws Exception {
+    public void shouldGetInstanceWithKeyWithMergedStreams() {
         final TopicPartition topic2P2 = new TopicPartition("topic-two", 2);
         hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1, topic2P2));
         discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null))));
@@ -250,7 +250,7 @@ public class StreamsMetadataStateTest {
     }
 
     @Test
-    public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() throws Exception {
+    public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() {
         final StreamsMetadata actual = discovery.getMetadataWithKey("not-a-store",
                 "key",
                 Serdes.String().serializer());
@@ -258,28 +258,28 @@ public class StreamsMetadataStateTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldThrowWhenKeyIsNull() throws Exception {
+    public void shouldThrowWhenKeyIsNull() {
         discovery.getMetadataWithKey("table-three", null, Serdes.String().serializer());
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldThrowWhenSerializerIsNull() throws Exception {
+    public void shouldThrowWhenSerializerIsNull() {
         discovery.getMetadataWithKey("table-three", "key", (Serializer) null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldThrowIfStoreNameIsNull() throws Exception {
+    public void shouldThrowIfStoreNameIsNull() {
         discovery.getMetadataWithKey(null, "key", Serdes.String().serializer());
     }
 
     @SuppressWarnings("unchecked")
     @Test(expected = NullPointerException.class)
-    public void shouldThrowIfStreamPartitionerIsNull() throws Exception {
+    public void shouldThrowIfStreamPartitionerIsNull() {
         discovery.getMetadataWithKey(null, "key", (StreamPartitioner) null);
     }
 
     @Test
-    public void shouldHaveGlobalStoreInAllMetadata() throws Exception {
+    public void shouldHaveGlobalStoreInAllMetadata() {
         final Collection<StreamsMetadata> metadata = discovery.getAllMetadataForStore(globalTable);
         assertEquals(3, metadata.size());
         for (StreamsMetadata streamsMetadata : metadata) {
@@ -288,26 +288,26 @@ public class StreamsMetadataStateTest {
     }
 
     @Test
-    public void shouldGetMyMetadataForGlobalStoreWithKey() throws Exception {
+    public void shouldGetMyMetadataForGlobalStoreWithKey() {
         final StreamsMetadata metadata = discovery.getMetadataWithKey(globalTable, "key", Serdes.String().serializer());
         assertEquals(hostOne, metadata.hostInfo());
     }
 
     @Test
-    public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() throws Exception {
+    public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() {
         final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), StreamsMetadataState.UNKNOWN_HOST);
         streamsMetadataState.onChange(hostToPartitions, cluster);
         assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", Serdes.String().serializer()));
     }
 
     @Test
-    public void shouldGetMyMetadataForGlobalStoreWithKeyAndPartitioner() throws Exception {
+    public void shouldGetMyMetadataForGlobalStoreWithKeyAndPartitioner() {
         final StreamsMetadata metadata = discovery.getMetadataWithKey(globalTable, "key", partitioner);
         assertEquals(hostOne, metadata.hostInfo());
     }
 
     @Test
-    public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() throws Exception {
+    public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() {
         final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), StreamsMetadataState.UNKNOWN_HOST);
         streamsMetadataState.onChange(hostToPartitions, cluster);
         assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", partitioner));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index 0e87a6d..215d4f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -31,7 +31,7 @@ import static org.junit.Assert.assertEquals;
 public class StreamsMetricsImplTest {
 
     @Test(expected = NullPointerException.class)
-    public void testNullMetrics() throws Exception {
+    public void testNullMetrics() {
         String groupName = "doesNotMatter";
         Map<String, String> tags = new HashMap<>();
         StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(null, groupName, tags);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index 9473a40..ec94ad8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -52,7 +52,7 @@ public class AssignmentInfoTest {
     }
 
     @Test
-    public void shouldDecodePreviousVersion() throws Exception {
+    public void shouldDecodePreviousVersion() throws IOException {
         List<TaskId> activeTasks =
                 Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
         Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
index d0743f1..034ae7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
@@ -32,19 +32,19 @@ public class ClientStateTest {
     private final ClientState client = new ClientState(1);
 
     @Test
-    public void shouldHaveNotReachedCapacityWhenAssignedTasksLessThanCapacity() throws Exception {
+    public void shouldHaveNotReachedCapacityWhenAssignedTasksLessThanCapacity() {
         assertFalse(client.reachedCapacity());
     }
 
     @Test
-    public void shouldHaveReachedCapacityWhenAssignedTasksGreaterThanOrEqualToCapacity() throws Exception {
+    public void shouldHaveReachedCapacityWhenAssignedTasksGreaterThanOrEqualToCapacity() {
         client.assign(new TaskId(0, 1), true);
         assertTrue(client.reachedCapacity());
     }
 
 
     @Test
-    public void shouldAddActiveTasksToBothAssignedAndActive() throws Exception {
+    public void shouldAddActiveTasksToBothAssignedAndActive() {
         final TaskId tid = new TaskId(0, 1);
 
         client.assign(tid, true);
@@ -55,7 +55,7 @@ public class ClientStateTest {
     }
 
     @Test
-    public void shouldAddStandbyTasksToBothStandbyAndActive() throws Exception {
+    public void shouldAddStandbyTasksToBothStandbyAndActive() {
         final TaskId tid = new TaskId(0, 1);
 
         client.assign(tid, false);
@@ -66,7 +66,7 @@ public class ClientStateTest {
     }
 
     @Test
-    public void shouldAddPreviousActiveTasksToPreviousAssignedAndPreviousActive() throws Exception {
+    public void shouldAddPreviousActiveTasksToPreviousAssignedAndPreviousActive() {
         final TaskId tid1 = new TaskId(0, 1);
         final TaskId tid2 = new TaskId(0, 2);
 
@@ -76,7 +76,7 @@ public class ClientStateTest {
     }
 
     @Test
-    public void shouldAddPreviousStandbyTasksToPreviousAssigned() throws Exception {
+    public void shouldAddPreviousStandbyTasksToPreviousAssigned() {
         final TaskId tid1 = new TaskId(0, 1);
         final TaskId tid2 = new TaskId(0, 2);
 
@@ -86,7 +86,7 @@ public class ClientStateTest {
     }
 
     @Test
-    public void shouldHaveAssignedTaskIfActiveTaskAssigned() throws Exception {
+    public void shouldHaveAssignedTaskIfActiveTaskAssigned() {
         final TaskId tid = new TaskId(0, 2);
 
         client.assign(tid, true);
@@ -94,7 +94,7 @@ public class ClientStateTest {
     }
 
     @Test
-    public void shouldHaveAssignedTaskIfStandbyTaskAssigned() throws Exception {
+    public void shouldHaveAssignedTaskIfStandbyTaskAssigned() {
         final TaskId tid = new TaskId(0, 2);
 
         client.assign(tid, false);
@@ -102,14 +102,14 @@ public class ClientStateTest {
     }
 
     @Test
-    public void shouldNotHaveAssignedTaskIfTaskNotAssigned() throws Exception {
+    public void shouldNotHaveAssignedTaskIfTaskNotAssigned() {
 
         client.assign(new TaskId(0, 2), true);
         assertFalse(client.hasAssignedTask(new TaskId(0, 3)));
     }
 
     @Test
-    public void shouldHaveMoreAvailableCapacityWhenCapacityTheSameButFewerAssignedTasks() throws Exception {
+    public void shouldHaveMoreAvailableCapacityWhenCapacityTheSameButFewerAssignedTasks() {
         final ClientState c2 = new ClientState(1);
         client.assign(new TaskId(0, 1), true);
         assertTrue(c2.hasMoreAvailableCapacityThan(client));
@@ -117,14 +117,14 @@ public class ClientStateTest {
     }
 
     @Test
-    public void shouldHaveMoreAvailableCapacityWhenCapacityHigherAndSameAssignedTaskCount() throws Exception {
+    public void shouldHaveMoreAvailableCapacityWhenCapacityHigherAndSameAssignedTaskCount() {
         final ClientState c2 = new ClientState(2);
         assertTrue(c2.hasMoreAvailableCapacityThan(client));
         assertFalse(client.hasMoreAvailableCapacityThan(c2));
     }
 
     @Test
-    public void shouldUseMultiplesOfCapacityToDetermineClientWithMoreAvailableCapacity() throws Exception {
+    public void shouldUseMultiplesOfCapacityToDetermineClientWithMoreAvailableCapacity() {
         final ClientState c2 = new ClientState(2);
 
         for (int i = 0; i < 7; i++) {
@@ -139,7 +139,7 @@ public class ClientStateTest {
     }
 
     @Test
-    public void shouldHaveMoreAvailableCapacityWhenCapacityIsTheSameButAssignedTasksIsLess() throws Exception {
+    public void shouldHaveMoreAvailableCapacityWhenCapacityIsTheSameButAssignedTasksIsLess() {
         final ClientState c1 = new ClientState(3);
         final ClientState c2 = new ClientState(3);
         for (int i = 0; i < 4; i++) {
@@ -151,26 +151,26 @@ public class ClientStateTest {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldThrowIllegalStateExceptionIfCapacityOfThisClientStateIsZero() throws Exception {
+    public void shouldThrowIllegalStateExceptionIfCapacityOfThisClientStateIsZero() {
         final ClientState c1 = new ClientState(0);
         c1.hasMoreAvailableCapacityThan(new ClientState(1));
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldThrowIllegalStateExceptionIfCapacityOfOtherClientStateIsZero() throws Exception {
+    public void shouldThrowIllegalStateExceptionIfCapacityOfOtherClientStateIsZero() {
         final ClientState c1 = new ClientState(1);
         c1.hasMoreAvailableCapacityThan(new ClientState(0));
     }
 
     @Test
-    public void shouldHaveUnfulfilledQuotaWhenActiveTaskSizeLessThanCapacityTimesTasksPerThread() throws Exception {
+    public void shouldHaveUnfulfilledQuotaWhenActiveTaskSizeLessThanCapacityTimesTasksPerThread() {
         final ClientState client = new ClientState(1);
         client.assign(new TaskId(0, 1), true);
         assertTrue(client.hasUnfulfilledQuota(2));
     }
 
     @Test
-    public void shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapacityTimesTasksPerThread() throws Exception {
+    public void shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapacityTimesTasksPerThread() {
         final ClientState client = new ClientState(1);
         client.assign(new TaskId(0, 1), true);
         assertFalse(client.hasUnfulfilledQuota(1));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 449dabd..86af0be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -51,7 +51,7 @@ public class StickyTaskAssignorTest {
     private final Integer p4 = 4;
 
     @Test
-    public void shouldAssignOneActiveTaskToEachProcessWhenTaskCountSameAsProcessCount() throws Exception {
+    public void shouldAssignOneActiveTaskToEachProcessWhenTaskCountSameAsProcessCount() {
         createClient(p1, 1);
         createClient(p2, 1);
         createClient(p3, 1);
@@ -65,7 +65,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldNotMigrateActiveTaskToOtherProcess() throws Exception {
+    public void shouldNotMigrateActiveTaskToOtherProcess() {
         createClientWithPreviousActiveTasks(p1, 1, task00);
         createClientWithPreviousActiveTasks(p2, 1, task01);
 
@@ -91,7 +91,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldMigrateActiveTasksToNewProcessWithoutChangingAllAssignments() throws Exception {
+    public void shouldMigrateActiveTasksToNewProcessWithoutChangingAllAssignments() {
         createClientWithPreviousActiveTasks(p1, 1, task00, task02);
         createClientWithPreviousActiveTasks(p2, 1, task01);
         createClient(p3, 1);
@@ -107,7 +107,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldAssignBasedOnCapacity() throws Exception {
+    public void shouldAssignBasedOnCapacity() {
         createClient(p1, 1);
         createClient(p2, 2);
         final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02);
@@ -152,7 +152,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldAssignTasksToClientWithPreviousStandbyTasks() throws Exception {
+    public void shouldAssignTasksToClientWithPreviousStandbyTasks() {
         final ClientState client1 = createClient(p1, 1);
         client1.addPreviousStandbyTasks(Utils.mkSet(task02));
         final ClientState client2 = createClient(p2, 1);
@@ -170,7 +170,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() throws Exception {
+    public void shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() {
         final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00);
         c1.addPreviousStandbyTasks(Utils.mkSet(task01));
         final ClientState c2 = createClientWithPreviousActiveTasks(p2, 2, task02);
@@ -185,7 +185,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldAssignStandbyTasksToDifferentClientThanCorrespondingActiveTaskIsAssingedTo() throws Exception {
+    public void shouldAssignStandbyTasksToDifferentClientThanCorrespondingActiveTaskIsAssingedTo() {
         createClientWithPreviousActiveTasks(p1, 1, task00);
         createClientWithPreviousActiveTasks(p2, 1, task01);
         createClientWithPreviousActiveTasks(p3, 1, task02);
@@ -215,7 +215,7 @@ public class StickyTaskAssignorTest {
 
 
     @Test
-    public void shouldAssignMultipleReplicasOfStandbyTask() throws Exception {
+    public void shouldAssignMultipleReplicasOfStandbyTask() {
         createClientWithPreviousActiveTasks(p1, 1, task00);
         createClientWithPreviousActiveTasks(p2, 1, task01);
         createClientWithPreviousActiveTasks(p3, 1, task02);
@@ -229,7 +229,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldNotAssignStandbyTaskReplicasWhenNoClientAvailableWithoutHavingTheTaskAssigned() throws Exception {
+    public void shouldNotAssignStandbyTaskReplicasWhenNoClientAvailableWithoutHavingTheTaskAssigned() {
         createClient(p1, 1);
         final StickyTaskAssignor taskAssignor = createTaskAssignor(task00);
         taskAssignor.assign(1);
@@ -237,7 +237,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldAssignActiveAndStandbyTasks() throws Exception {
+    public void shouldAssignActiveAndStandbyTasks() {
         createClient(p1, 1);
         createClient(p2, 1);
         createClient(p3, 1);
@@ -251,7 +251,7 @@ public class StickyTaskAssignorTest {
 
 
     @Test
-    public void shouldAssignAtLeastOneTaskToEachClientIfPossible() throws Exception {
+    public void shouldAssignAtLeastOneTaskToEachClientIfPossible() {
         createClient(p1, 3);
         createClient(p2, 1);
         createClient(p3, 1);
@@ -264,7 +264,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldAssignEachActiveTaskToOneClientWhenMoreClientsThanTasks() throws Exception {
+    public void shouldAssignEachActiveTaskToOneClientWhenMoreClientsThanTasks() {
         createClient(p1, 1);
         createClient(p2, 1);
         createClient(p3, 1);
@@ -279,7 +279,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldBalanceActiveAndStandbyTasksAcrossAvailableClients() throws Exception {
+    public void shouldBalanceActiveAndStandbyTasksAcrossAvailableClients() {
         createClient(p1, 1);
         createClient(p2, 1);
         createClient(p3, 1);
@@ -296,7 +296,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldAssignMoreTasksToClientWithMoreCapacity() throws Exception {
+    public void shouldAssignMoreTasksToClientWithMoreCapacity() {
         createClient(p2, 2);
         createClient(p1, 1);
 
@@ -320,7 +320,7 @@ public class StickyTaskAssignorTest {
 
 
     @Test
-    public void shouldNotHaveSameAssignmentOnAnyTwoHosts() throws Exception {
+    public void shouldNotHaveSameAssignmentOnAnyTwoHosts() {
         createClient(p1, 1);
         createClient(p2, 1);
         createClient(p3, 1);
@@ -342,7 +342,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks() throws Exception {
+    public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks() {
         createClientWithPreviousActiveTasks(p1, 1, task01, task02);
         createClientWithPreviousActiveTasks(p2, 1, task03);
         createClientWithPreviousActiveTasks(p3, 1, task00);
@@ -364,7 +364,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() throws Exception {
+    public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() {
         final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task02);
         c1.addPreviousStandbyTasks(Utils.mkSet(task03, task00));
         final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task03, task00);
@@ -389,7 +389,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldReBalanceTasksAcrossAllClientsWhenCapacityAndTaskCountTheSame() throws Exception {
+    public void shouldReBalanceTasksAcrossAllClientsWhenCapacityAndTaskCountTheSame() {
         createClientWithPreviousActiveTasks(p3, 1, task00, task01, task02, task03);
         createClient(p1, 1);
         createClient(p2, 1);
@@ -405,7 +405,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldReBalanceTasksAcrossClientsWhenCapacityLessThanTaskCount() throws Exception {
+    public void shouldReBalanceTasksAcrossClientsWhenCapacityLessThanTaskCount() {
         createClientWithPreviousActiveTasks(p3, 1, task00, task01, task02, task03);
         createClient(p1, 1);
         createClient(p2, 1);
@@ -419,7 +419,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldRebalanceTasksToClientsBasedOnCapacity() throws Exception {
+    public void shouldRebalanceTasksToClientsBasedOnCapacity() {
         createClientWithPreviousActiveTasks(p2, 1, task00, task03, task02);
         createClient(p3, 2);
         final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task02, task03);
@@ -429,7 +429,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldMoveMinimalNumberOfTasksWhenPreviouslyAboveCapacityAndNewClientAdded() throws Exception {
+    public void shouldMoveMinimalNumberOfTasksWhenPreviouslyAboveCapacityAndNewClientAdded() {
         final Set<TaskId> p1PrevTasks = Utils.mkSet(task00, task02);
         final Set<TaskId> p2PrevTasks = Utils.mkSet(task01, task03);
 
@@ -450,7 +450,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldNotMoveAnyTasksWhenNewTasksAdded() throws Exception {
+    public void shouldNotMoveAnyTasksWhenNewTasksAdded() {
         createClientWithPreviousActiveTasks(p1, 1, task00, task01);
         createClientWithPreviousActiveTasks(p2, 1, task02, task03);
 
@@ -462,7 +462,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldAssignNewTasksToNewClientWhenPreviousTasksAssignedToOldClients() throws Exception {
+    public void shouldAssignNewTasksToNewClientWhenPreviousTasksAssignedToOldClients() {
 
         createClientWithPreviousActiveTasks(p1, 1, task02, task01);
         createClientWithPreviousActiveTasks(p2, 1, task00, task03);
@@ -477,7 +477,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldAssignTasksNotPreviouslyActiveToNewClient() throws Exception {
+    public void shouldAssignTasksNotPreviouslyActiveToNewClient() {
         final TaskId task10 = new TaskId(0, 10);
         final TaskId task11 = new TaskId(0, 11);
         final TaskId task12 = new TaskId(1, 2);
@@ -507,7 +507,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldAssignTasksNotPreviouslyActiveToMultipleNewClients() throws Exception {
+    public void shouldAssignTasksNotPreviouslyActiveToMultipleNewClients() {
         final TaskId task10 = new TaskId(0, 10);
         final TaskId task11 = new TaskId(0, 11);
         final TaskId task12 = new TaskId(1, 2);
@@ -538,7 +538,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldAssignTasksToNewClient() throws Exception {
+    public void shouldAssignTasksToNewClient() {
         createClientWithPreviousActiveTasks(p1, 1, task01, task02);
         createClient(p2, 1);
         createTaskAssignor(task01, task02).assign(0);
@@ -546,7 +546,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients() throws Exception {
+    public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients() {
         final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02);
         final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task03, task04, task05);
         final ClientState newClient = createClient(p3, 1);
@@ -565,7 +565,7 @@ public class StickyTaskAssignorTest {
     }
 
     @Test
-    public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingAndBouncedClients() throws Exception {
+    public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingAndBouncedClients() {
         final TaskId task06 = new TaskId(0, 6);
         final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02, task06);
         final ClientState c2 = createClient(p2, 1);


Mime
View raw message