Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9167E200D04 for ; Mon, 11 Sep 2017 10:42:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8FF1E1609C3; Mon, 11 Sep 2017 08:42:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 407621609C4 for ; Mon, 11 Sep 2017 10:42:20 +0200 (CEST) Received: (qmail 45861 invoked by uid 500); 11 Sep 2017 08:42:19 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 45824 invoked by uid 99); 11 Sep 2017 08:42:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Sep 2017 08:42:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0E5EEF566E; Mon, 11 Sep 2017 08:42:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: damianguy@apache.org To: commits@kafka.apache.org Date: Mon, 11 Sep 2017 08:42:20 -0000 Message-Id: In-Reply-To: <23678de4a6df4cc0a9f570750cf023b8@git.apache.org> References: <23678de4a6df4cc0a9f570750cf023b8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/5] kafka git commit: KAFKA-5531; throw concrete exceptions in streams tests archived-at: Mon, 11 Sep 2017 08:42:22 -0000 http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 8aedf36..fbf45b3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -312,7 +312,7 @@ public class ProcessorStateManagerTest { } @Test - public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception { + public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws IOException { final ProcessorStateManager stateMgr = new ProcessorStateManager( new TaskId(0, 1), noPartitions, @@ -326,7 +326,7 @@ public class ProcessorStateManagerTest { } @Test - public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws Exception { + public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws IOException { final Map offsets = Collections.singletonMap(persistentStorePartition, 99L); checkpoint.write(offsets); @@ -346,7 +346,7 @@ public class ProcessorStateManagerTest { } @Test - public void shouldWriteCheckpointForPersistentLogEnabledStore() throws Exception { + public void shouldWriteCheckpointForPersistentLogEnabledStore() throws IOException { final ProcessorStateManager stateMgr = new ProcessorStateManager( taskId, noPartitions, @@ -363,7 +363,7 @@ public class ProcessorStateManagerTest { } @Test - public void shouldWriteCheckpointForStandbyReplica() throws Exception { + public void shouldWriteCheckpointForStandbyReplica() throws IOException { final ProcessorStateManager stateMgr = new ProcessorStateManager( taskId, noPartitions, @@ -391,7 +391,7 @@ public class ProcessorStateManagerTest { } @Test - public void shouldNotWriteCheckpointForNonPersistent() throws Exception { + public void shouldNotWriteCheckpointForNonPersistent() throws IOException { final TopicPartition topicPartition = new TopicPartition(nonPersistentStoreTopicName, 1); final ProcessorStateManager stateMgr = new ProcessorStateManager( @@ -411,7 +411,7 @@ public class ProcessorStateManagerTest { } @Test - public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws Exception { + public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws IOException { final ProcessorStateManager stateMgr = new ProcessorStateManager( taskId, noPartitions, @@ -431,7 +431,7 @@ public class ProcessorStateManagerTest { @Test - public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() throws Exception { + public void shouldThrowIllegalArgumentExceptionIfStoreNameIsSameAsCheckpointFileName() throws IOException { final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, noPartitions, @@ -450,7 +450,7 @@ public class ProcessorStateManagerTest { } @Test - public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered() throws Exception { + public void shouldThrowIllegalArgumentExceptionOnRegisterWhenStoreHasAlreadyBeenRegistered() throws IOException { final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, noPartitions, @@ -471,7 +471,7 @@ public class ProcessorStateManagerTest { } @Test - public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws Exception { + public void shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAnException() throws IOException { final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, @@ -499,7 +499,7 @@ public class ProcessorStateManagerTest { } @Test - public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws Exception { + public void shouldDeleteCheckpointFileOnCreationIfEosEnabled() throws IOException { checkpoint.write(Collections.emptyMap()); assertTrue(checkpointFile.exists()); http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 1ea09cd..da2e5dc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -204,7 +204,7 @@ public class ProcessorTopologyTest { @SuppressWarnings("unchecked") @Test - public void shouldDriveGlobalStore() throws Exception { + public void shouldDriveGlobalStore() { final StateStoreSupplier storeSupplier = Stores.create("my-store") .withStringKeys().withStringValues().inMemory().disableLogging().build(); final String global = "global"; @@ -271,7 +271,7 @@ public class ProcessorTopologyTest { } @Test - public void shouldCreateStringWithSourceAndTopics() throws Exception { + public void shouldCreateStringWithSourceAndTopics() { builder.addSource("source", "topic1", "topic2"); final ProcessorTopology topology = builder.build(null); final String result = topology.toString(); @@ -279,7 +279,7 @@ public class ProcessorTopologyTest { } @Test - public void shouldCreateStringWithMultipleSourcesAndTopics() throws Exception { + public void shouldCreateStringWithMultipleSourcesAndTopics() { builder.addSource("source", "topic1", "topic2"); builder.addSource("source2", "t", "t1", "t2"); final ProcessorTopology topology = builder.build(null); @@ -289,7 +289,7 @@ public class ProcessorTopologyTest { } @Test - public void shouldCreateStringWithProcessors() throws Exception { + public void shouldCreateStringWithProcessors() { builder.addSource("source", "t") .addProcessor("processor", mockProcessorSupplier, "source") .addProcessor("other", mockProcessorSupplier, "source"); @@ -301,7 +301,7 @@ public class ProcessorTopologyTest { } @Test - public void shouldRecursivelyPrintChildren() throws Exception { + public void shouldRecursivelyPrintChildren() { builder.addSource("source", "t") .addProcessor("processor", mockProcessorSupplier, "source") .addProcessor("child-one", mockProcessorSupplier, "processor") http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 1aa7f38..09f46e0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -123,7 +123,7 @@ public class RecordCollectorTest { @SuppressWarnings("unchecked") @Test - public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception { + public void shouldRetryWhenTimeoutExceptionOccursOnSend() { final AtomicInteger attempt = new AtomicInteger(0); final RecordCollectorImpl collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @@ -144,7 +144,7 @@ public class RecordCollectorTest { @SuppressWarnings("unchecked") @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception { + public void shouldThrowStreamsExceptionAfterMaxAttempts() { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override @@ -160,7 +160,7 @@ public class RecordCollectorTest { @SuppressWarnings("unchecked") @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() throws Exception { + public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override @@ -176,7 +176,7 @@ public class RecordCollectorTest { @SuppressWarnings("unchecked") @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionOnFlushIfASendFailed() throws Exception { + public void shouldThrowStreamsExceptionOnFlushIfASendFailed() { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override @@ -192,7 +192,7 @@ public class RecordCollectorTest { @SuppressWarnings("unchecked") @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception { + public void shouldThrowStreamsExceptionOnCloseIfASendFailed() { final RecordCollector collector = new RecordCollectorImpl( new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index 6c45fd8..c6e9eac 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -147,7 +147,7 @@ public class RecordQueueTest { } @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() throws Exception { + public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() { final byte[] key = Serdes.Long().serializer().serialize("foo", 1L); final List> records = Collections.singletonList( new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, key, recordValue)); @@ -156,7 +156,7 @@ public class RecordQueueTest { } @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionWhenValueDeserializationFails() throws Exception { + public void shouldThrowStreamsExceptionWhenValueDeserializationFails() { final byte[] value = Serdes.Long().serializer().serialize("foo", 1L); final List> records = Collections.singletonList( new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, value)); http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java index 9ba8308..821dbe9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java @@ -41,21 +41,21 @@ public class SourceNodeRecordDeserializerTest { @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionIfKeyFailsToDeserialize() throws Exception { + public void shouldThrowStreamsExceptionIfKeyFailsToDeserialize() { final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer( new TheSourceNode(true, false), null); recordDeserializer.deserialize(rawRecord); } @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionIfKeyValueFailsToDeserialize() throws Exception { + public void shouldThrowStreamsExceptionIfKeyValueFailsToDeserialize() { final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer( new TheSourceNode(false, true), null); recordDeserializer.deserialize(rawRecord); } @Test - public void shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions() throws Exception { + public void shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions() { final SourceNodeRecordDeserializer recordDeserializer = new SourceNodeRecordDeserializer( new TheSourceNode(false, false, "key", "value"), null); final ConsumerRecord record = recordDeserializer.deserialize(rawRecord); http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 881da2a..fd63bd6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -119,7 +119,7 @@ public class StandbyTaskTest { private File baseDir; private StateDirectory stateDirectory; - private StreamsConfig createConfig(final File baseDir) throws Exception { + private StreamsConfig createConfig(final File baseDir) throws IOException { return new StreamsConfig(new Properties() { { setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); @@ -162,7 +162,7 @@ public class StandbyTaskTest { } @Test - public void testStorePartitions() throws Exception { + public void testStorePartitions() throws IOException { StreamsConfig config = createConfig(baseDir); StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory); task.initialize(); @@ -171,8 +171,8 @@ public class StandbyTaskTest { } @SuppressWarnings("unchecked") - @Test(expected = Exception.class) - public void testUpdateNonPersistentStore() throws Exception { + @Test(expected = ProcessorStateException.class) + public void testUpdateNonPersistentStore() throws IOException { StreamsConfig config = createConfig(baseDir); StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory); @@ -185,7 +185,7 @@ public class StandbyTaskTest { } @Test - public void testUpdate() throws Exception { + public void testUpdate() throws IOException { StreamsConfig config = createConfig(baseDir); StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory); task.initialize(); @@ -231,7 +231,7 @@ public class StandbyTaskTest { } @Test - public void testUpdateKTable() throws Exception { + public void testUpdateKTable() throws IOException { consumer.assign(Utils.mkList(ktable)); Map committedOffsets = new HashMap<>(); committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(0L)); @@ -323,7 +323,7 @@ public class StandbyTaskTest { } @Test - public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws Exception { + public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws IOException { final String changelogName = "test-application-my-store-changelog"; final List partitions = Utils.mkList(new TopicPartition(changelogName, 0)); consumer.assign(partitions); @@ -345,7 +345,7 @@ public class StandbyTaskTest { } @Test - public void shouldCheckpointStoreOffsetsOnCommit() throws Exception { + public void shouldCheckpointStoreOffsetsOnCommit() throws IOException { consumer.assign(Utils.mkList(ktable)); final Map committedOffsets = new HashMap<>(); committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(100L)); http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java index 7a860e3..8024cd9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Utils; import org.junit.Before; import org.junit.Test; +import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -45,7 +46,7 @@ public class StateConsumerTest { private StateMaintainerStub stateMaintainer; @Before - public void setUp() throws Exception { + public void setUp() { partitionOffsets.put(topicOne, 20L); partitionOffsets.put(topicTwo, 30L); stateMaintainer = new StateMaintainerStub(partitionOffsets); @@ -53,20 +54,20 @@ public class StateConsumerTest { } @Test - public void shouldAssignPartitionsToConsumer() throws Exception { + public void shouldAssignPartitionsToConsumer() { stateConsumer.initialize(); assertEquals(Utils.mkSet(topicOne, topicTwo), consumer.assignment()); } @Test - public void shouldSeekToInitialOffsets() throws Exception { + public void shouldSeekToInitialOffsets() { stateConsumer.initialize(); assertEquals(20L, consumer.position(topicOne)); assertEquals(30L, consumer.position(topicTwo)); } @Test - public void shouldUpdateStateWithReceivedRecordsForPartition() throws Exception { + public void shouldUpdateStateWithReceivedRecordsForPartition() { stateConsumer.initialize(); consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 20L, new byte[0], new byte[0])); consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 21L, new byte[0], new byte[0])); @@ -75,7 +76,7 @@ public class StateConsumerTest { } @Test - public void shouldUpdateStateWithReceivedRecordsForAllTopicPartition() throws Exception { + public void shouldUpdateStateWithReceivedRecordsForAllTopicPartition() { stateConsumer.initialize(); consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 20L, new byte[0], new byte[0])); consumer.addRecord(new ConsumerRecord<>("topic-two", 1, 31L, new byte[0], new byte[0])); @@ -86,7 +87,7 @@ public class StateConsumerTest { } @Test - public void shouldFlushStoreWhenFlushIntervalHasLapsed() throws Exception { + public void shouldFlushStoreWhenFlushIntervalHasLapsed() { stateConsumer.initialize(); consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 20L, new byte[0], new byte[0])); time.sleep(FLUSH_INTERVAL); @@ -96,7 +97,7 @@ public class StateConsumerTest { } @Test - public void shouldNotFlushOffsetsWhenFlushIntervalHasNotLapsed() throws Exception { + public void shouldNotFlushOffsetsWhenFlushIntervalHasNotLapsed() { stateConsumer.initialize(); consumer.addRecord(new ConsumerRecord<>("topic-one", 1, 20L, new byte[0], new byte[0])); time.sleep(FLUSH_INTERVAL / 2); @@ -105,7 +106,7 @@ public class StateConsumerTest { } @Test - public void shouldNotFlushWhenFlushIntervalIsZero() throws Exception { + public void shouldNotFlushWhenFlushIntervalIsZero() { stateConsumer = new GlobalStreamThread.StateConsumer("test", consumer, stateMaintainer, time, 10L, -1); stateConsumer.initialize(); time.sleep(100); @@ -114,13 +115,13 @@ public class StateConsumerTest { } @Test - public void shouldCloseConsumer() throws Exception { + public void shouldCloseConsumer() throws IOException { stateConsumer.close(); assertTrue(consumer.closed()); } @Test - public void shouldCloseStateMaintainer() throws Exception { + public void shouldCloseStateMaintainer() throws IOException { stateConsumer.close(); assertTrue(stateMaintainer.closed); } http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index 2b28090..886188d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -63,7 +63,7 @@ public class StateDirectoryTest { } @Test - public void shouldCreateBaseDirectory() throws Exception { + public void shouldCreateBaseDirectory() { assertTrue(stateDir.exists()); assertTrue(stateDir.isDirectory()); assertTrue(appDir.exists()); @@ -71,7 +71,7 @@ public class StateDirectoryTest { } @Test - public void shouldCreateTaskStateDirectory() throws Exception { + public void shouldCreateTaskStateDirectory() { final TaskId taskId = new TaskId(0, 0); final File taskDirectory = directory.directoryForTask(taskId); assertTrue(taskDirectory.exists()); @@ -79,7 +79,7 @@ public class StateDirectoryTest { } @Test - public void shouldLockTaskStateDirectory() throws Exception { + public void shouldLockTaskStateDirectory() throws IOException { final TaskId taskId = new TaskId(0, 0); final File taskDirectory = directory.directoryForTask(taskId); @@ -100,7 +100,7 @@ public class StateDirectoryTest { } @Test - public void shouldBeTrueIfAlreadyHoldsLock() throws Exception { + public void shouldBeTrueIfAlreadyHoldsLock() throws IOException { final TaskId taskId = new TaskId(0, 0); directory.directoryForTask(taskId); directory.lock(taskId, 0); @@ -112,7 +112,7 @@ public class StateDirectoryTest { } @Test(expected = ProcessorStateException.class) - public void shouldThrowProcessorStateException() throws Exception { + public void shouldThrowProcessorStateException() throws IOException { final TaskId taskId = new TaskId(0, 0); Utils.delete(stateDir); @@ -120,7 +120,7 @@ public class StateDirectoryTest { } @Test - public void shouldNotLockDeletedDirectory() throws Exception { + public void shouldNotLockDeletedDirectory() throws IOException { final TaskId taskId = new TaskId(0, 0); Utils.delete(stateDir); @@ -128,7 +128,7 @@ public class StateDirectoryTest { } @Test - public void shouldLockMulitpleTaskDirectories() throws Exception { + public void shouldLockMulitpleTaskDirectories() throws IOException { final TaskId taskId = new TaskId(0, 0); final File task1Dir = directory.directoryForTask(taskId); final TaskId taskId2 = new TaskId(1, 0); @@ -159,7 +159,7 @@ public class StateDirectoryTest { } @Test - public void shouldReleaseTaskStateDirectoryLock() throws Exception { + public void shouldReleaseTaskStateDirectoryLock() throws IOException { final TaskId taskId = new TaskId(0, 0); final File taskDirectory = directory.directoryForTask(taskId); @@ -177,7 +177,7 @@ public class StateDirectoryTest { } @Test - public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws Exception { + public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws IOException { final TaskId task0 = new TaskId(0, 0); final TaskId task1 = new TaskId(1, 0); try { @@ -198,7 +198,7 @@ public class StateDirectoryTest { } @Test - public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() throws Exception { + public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() { final File dir = directory.directoryForTask(new TaskId(2, 0)); final int cleanupDelayMs = 60000; directory.cleanRemovedTasks(cleanupDelayMs); @@ -210,14 +210,14 @@ public class StateDirectoryTest { } @Test - public void shouldNotRemoveNonTaskDirectoriesAndFiles() throws Exception { + public void shouldNotRemoveNonTaskDirectoriesAndFiles() { final File otherDir = TestUtils.tempDirectory(stateDir.toPath(), "foo"); directory.cleanRemovedTasks(0); assertTrue(otherDir.exists()); } @Test - public void shouldListAllTaskDirectories() throws Exception { + public void shouldListAllTaskDirectories() { TestUtils.tempDirectory(stateDir.toPath(), "foo"); final File taskDir1 = directory.directoryForTask(new TaskId(0, 0)); final File taskDir2 = directory.directoryForTask(new TaskId(0, 1)); @@ -229,7 +229,7 @@ public class StateDirectoryTest { } @Test - public void shouldCreateDirectoriesIfParentDoesntExist() throws Exception { + public void shouldCreateDirectoriesIfParentDoesntExist() { final File tempDir = TestUtils.tempDirectory(); final File stateDir = new File(new File(tempDir, "foo"), "state-dir"); final StateDirectory stateDirectory = new StateDirectory(applicationId, stateDir.getPath(), time); @@ -239,7 +239,7 @@ public class StateDirectoryTest { } @Test(expected = OverlappingFileLockException.class) - public void shouldLockGlobalStateDirectory() throws Exception { + public void shouldLockGlobalStateDirectory() throws IOException { directory.lockGlobalState(1); try ( @@ -255,7 +255,7 @@ public class StateDirectoryTest { } @Test - public void shouldUnlockGlobalStateDirectory() throws Exception { + public void shouldUnlockGlobalStateDirectory() throws IOException { directory.lockGlobalState(1); directory.unlockGlobalState(); http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java index 8abce3b..c782790 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java @@ -44,28 +44,28 @@ public class StateRestorerTest { } @Test - public void shouldCallRestoreOnRestoreCallback() throws Exception { + public void shouldCallRestoreOnRestoreCallback() { restorer.restore(Collections.singletonList(KeyValue.pair(new byte[0], new byte[0]))); assertThat(callback.restored.size(), equalTo(1)); } @Test - public void shouldBeCompletedIfRecordOffsetGreaterThanEndOffset() throws Exception { + public void shouldBeCompletedIfRecordOffsetGreaterThanEndOffset() { assertTrue(restorer.hasCompleted(11, 10)); } @Test - public void shouldBeCompletedIfRecordOffsetGreaterThanOffsetLimit() throws Exception { + public void shouldBeCompletedIfRecordOffsetGreaterThanOffsetLimit() { assertTrue(restorer.hasCompleted(51, 100)); } @Test - public void shouldBeCompletedIfEndOffsetAndRecordOffsetAreZero() throws Exception { + public void shouldBeCompletedIfEndOffsetAndRecordOffsetAreZero() { assertTrue(restorer.hasCompleted(0, 0)); } @Test - public void shouldBeCompletedIfOffsetAndOffsetLimitAreZero() throws Exception { + public void shouldBeCompletedIfOffsetAndOffsetLimitAreZero() { final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), compositeRestoreListener, null, 0, true, @@ -74,7 +74,7 @@ public class StateRestorerTest { } @Test - public void shouldSetRestoredOffsetToMinOfLimitAndOffset() throws Exception { + public void shouldSetRestoredOffsetToMinOfLimitAndOffset() { restorer.setRestoredOffset(20); assertThat(restorer.restoredOffset(), equalTo(20L)); restorer.setRestoredOffset(100); @@ -82,7 +82,7 @@ public class StateRestorerTest { } @Test - public void shouldSetStartingOffsetToMinOfLimitAndOffset() throws Exception { + public void shouldSetStartingOffsetToMinOfLimitAndOffset() { restorer.setStartingOffset(20); assertThat(restorer.startingOffset(), equalTo(20L)); restorer.setRestoredOffset(100); @@ -90,7 +90,7 @@ public class StateRestorerTest { } @Test - public void shouldReturnCorrectNumRestoredRecords() throws Exception { + public void shouldReturnCorrectNumRestoredRecords() { restorer.setStartingOffset(20); restorer.setRestoredOffset(40); assertThat(restorer.restoredNumRecords(), equalTo(20L)); http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index a480ec1..c574bbc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -60,7 +60,7 @@ public class StoreChangelogReaderTest { } @Test - public void shouldRequestTopicsAndHandleTimeoutException() throws Exception { + public void shouldRequestTopicsAndHandleTimeoutException() { final AtomicBoolean functionCalled = new AtomicBoolean(false); final MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override @@ -78,7 +78,7 @@ public class StoreChangelogReaderTest { } @Test - public void shouldThrowExceptionIfConsumerHasCurrentSubscription() throws Exception { + public void shouldThrowExceptionIfConsumerHasCurrentSubscription() { consumer.subscribe(Collections.singleton("sometopic")); try { changelogReader.restore(); @@ -89,7 +89,7 @@ public class StoreChangelogReaderTest { } @Test - public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() throws Exception { + public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() { final int messages = 10; setupConsumer(messages, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, @@ -99,7 +99,7 @@ public class StoreChangelogReaderTest { } @Test - public void shouldRestoreMessagesFromCheckpoint() throws Exception { + public void shouldRestoreMessagesFromCheckpoint() { final int messages = 10; setupConsumer(messages, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, 5L, Long.MAX_VALUE, true, @@ -110,7 +110,7 @@ public class StoreChangelogReaderTest { } @Test - public void shouldClearAssignmentAtEndOfRestore() throws Exception { + public void shouldClearAssignmentAtEndOfRestore() { final int messages = 1; setupConsumer(messages, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, @@ -121,7 +121,7 @@ public class StoreChangelogReaderTest { } @Test - public void shouldRestoreToLimitWhenSupplied() throws Exception { + public void shouldRestoreToLimitWhenSupplied() { setupConsumer(10, topicPartition); final StateRestorer restorer = new StateRestorer(topicPartition, restoreListener, null, 3, true, "storeName"); @@ -132,7 +132,7 @@ public class StoreChangelogReaderTest { } @Test - public void shouldRestoreMultipleStores() throws Exception { + public void shouldRestoreMultipleStores() { final TopicPartition one = new TopicPartition("one", 0); final TopicPartition two = new TopicPartition("two", 0); final MockRestoreCallback callbackOne = new MockRestoreCallback(); @@ -206,7 +206,7 @@ public class StoreChangelogReaderTest { } @Test - public void shouldNotRestoreAnythingWhenPartitionIsEmpty() throws Exception { + public void shouldNotRestoreAnythingWhenPartitionIsEmpty() { final StateRestorer restorer = new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName"); @@ -219,7 +219,7 @@ public class StoreChangelogReaderTest { } @Test - public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() throws Exception { + public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() { final Long endOffset = 10L; setupConsumer(endOffset, topicPartition); final StateRestorer @@ -234,7 +234,7 @@ public class StoreChangelogReaderTest { } @Test - public void shouldReturnRestoredOffsetsForPersistentStores() throws Exception { + public void shouldReturnRestoredOffsetsForPersistentStores() { setupConsumer(10, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, true, "storeName")); @@ -244,7 +244,7 @@ public class StoreChangelogReaderTest { } @Test - public void shouldNotReturnRestoredOffsetsForNonPersistentStore() throws Exception { + public void shouldNotReturnRestoredOffsetsForNonPersistentStore() { setupConsumer(10, topicPartition); changelogReader.register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE, false, "storeName")); @@ -254,7 +254,7 @@ public class StoreChangelogReaderTest { } @Test - public void shouldIgnoreNullKeysWhenRestoring() throws Exception { + public void shouldIgnoreNullKeysWhenRestoring() { assignPartition(3, topicPartition); final byte[] bytes = new byte[0]; consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 0, bytes, bytes)); http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index e638a48..d9a215c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -708,7 +708,7 @@ public class StreamPartitionAssignorTest { } @Test - public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception { + public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() { final String myEndPoint = "localhost:j87yhk"; final String applicationId = "application-id"; builder.setApplicationId(applicationId); @@ -760,7 +760,7 @@ public class StreamPartitionAssignorTest { } @Test - public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception { + public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() { final Cluster cluster = partitionAssignor.clusterMetadata(); assertNotNull(cluster); } @@ -968,12 +968,12 @@ public class StreamPartitionAssignorTest { } @Test(expected = KafkaException.class) - public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() throws Exception { + public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() { partitionAssignor.configure(Collections.singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1)); } @Test(expected = KafkaException.class) - public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProviderInstance() throws Exception { + public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProviderInstance() { final Map config = new HashMap<>(); config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); config.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, "i am not a stream thread"); http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index a9d43ba..2b719d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -137,7 +137,7 @@ public class StreamTaskTest { } }; - private StreamsConfig createConfig(final boolean enableEoS) throws Exception { + private StreamsConfig createConfig(final boolean enableEoS) throws IOException { return new StreamsConfig(new Properties() { { setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-task-test"); @@ -153,7 +153,7 @@ public class StreamTaskTest { } @Before - public void setup() throws Exception { + public void setup() throws IOException { consumer.assign(Arrays.asList(partition1, partition2)); source1.addChild(processorStreamTime); source2.addChild(processorStreamTime); @@ -180,7 +180,7 @@ public class StreamTaskTest { @SuppressWarnings("unchecked") @Test - public void testProcessOrder() throws Exception { + public void testProcessOrder() { task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), @@ -236,7 +236,7 @@ public class StreamTaskTest { @Test - public void testMetrics() throws Exception { + public void testMetrics() { final String name = task.id().toString(); final Map metricTags = new LinkedHashMap<>(); metricTags.put("task-id", name); @@ -252,7 +252,7 @@ public class StreamTaskTest { @SuppressWarnings("unchecked") @Test - public void testPauseResume() throws Exception { + public void testPauseResume() { task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue) @@ -305,7 +305,7 @@ public class StreamTaskTest { @SuppressWarnings("unchecked") @Test - public void testMaybePunctuateStreamTime() throws Exception { + public void testMaybePunctuateStreamTime() { task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), @@ -368,7 +368,7 @@ public class StreamTaskTest { @SuppressWarnings("unchecked") @Test - public void testCancelPunctuateStreamTime() throws Exception { + public void testCancelPunctuateStreamTime() { task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), @@ -397,7 +397,7 @@ public class StreamTaskTest { } @Test - public void shouldPunctuateSystemTimeWhenIntervalElapsed() throws Exception { + public void shouldPunctuateSystemTimeWhenIntervalElapsed() { long now = time.milliseconds(); time.sleep(10); assertTrue(task.maybePunctuateSystemTime()); @@ -409,7 +409,7 @@ public class StreamTaskTest { } @Test - public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() throws Exception { + public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() { long now = time.milliseconds(); assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate time.sleep(9); @@ -418,7 +418,7 @@ public class StreamTaskTest { } @Test - public void testCancelPunctuateSystemTime() throws Exception { + public void testCancelPunctuateSystemTime() { long now = time.milliseconds(); time.sleep(10); assertTrue(task.maybePunctuateSystemTime()); @@ -430,7 +430,7 @@ public class StreamTaskTest { @SuppressWarnings("unchecked") @Test - public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() throws Exception { + public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() { final MockSourceNode processorNode = new MockSourceNode(topic1, intDeserializer, intDeserializer) { @Override @@ -476,7 +476,7 @@ public class StreamTaskTest { @SuppressWarnings(value = {"unchecked", "deprecation"}) @Test - public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingDeprecated() throws Exception { + public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingDeprecated() { final Processor processor = new AbstractProcessor() { @Override public void init(final ProcessorContext context) { @@ -512,7 +512,7 @@ public class StreamTaskTest { @SuppressWarnings("unchecked") @Test - public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() throws Exception { + public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() { final Processor processor = new AbstractProcessor() { @Override public void init(final ProcessorContext context) { @@ -544,7 +544,7 @@ public class StreamTaskTest { } @Test - public void shouldFlushRecordCollectorOnFlushState() throws Exception { + public void shouldFlushRecordCollectorOnFlushState() { final AtomicBoolean flushed = new AtomicBoolean(false); final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics()); final StreamTask streamTask = new StreamTask(taskId00, "appId", partitions, topology, consumer, @@ -566,7 +566,7 @@ public class StreamTaskTest { @SuppressWarnings("unchecked") @Test - public void shouldCheckpointOffsetsOnCommit() throws Exception { + public void shouldCheckpointOffsetsOnCommit() throws IOException { final String storeName = "test"; final String changelogTopic = ProcessorStateManager.storeChangelogTopic("appId", storeName); final InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore(storeName, null, null) { @@ -628,7 +628,7 @@ public class StreamTaskTest { @SuppressWarnings("unchecked") @Test - public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() throws Exception { + public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() { final Map properties = config.originals(); properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); final StreamsConfig testConfig = new StreamsConfig(properties); @@ -693,7 +693,7 @@ public class StreamTaskTest { } @Test - public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() throws Exception { + public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() { ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processorStreamTime); try { task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator); @@ -704,7 +704,7 @@ public class StreamTaskTest { } @Test - public void shouldCallPunctuateOnPassedInProcessorNode() throws Exception { + public void shouldCallPunctuateOnPassedInProcessorNode() { task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator); assertThat(punctuatedAt, equalTo(5L)); task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator); @@ -712,13 +712,13 @@ public class StreamTaskTest { } @Test - public void shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() throws Exception { + public void shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() { task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator); assertThat(((ProcessorContextImpl) task.processorContext()).currentNode(), nullValue()); } @Test(expected = IllegalStateException.class) - public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() throws Exception { + public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() { task.schedule(1, PunctuationType.STREAM_TIME, new Punctuator() { @Override public void punctuate(long timestamp) { @@ -728,7 +728,7 @@ public class StreamTaskTest { } @Test - public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() throws Exception { + public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() { ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processorStreamTime); task.schedule(1, PunctuationType.STREAM_TIME, new Punctuator() { @Override @@ -739,7 +739,7 @@ public class StreamTaskTest { } @Test - public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() throws Exception { + public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() { task.close(true, false); task = createTaskThatThrowsExceptionOnClose(); task.initialize(); @@ -755,7 +755,7 @@ public class StreamTaskTest { } @Test - public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() throws Exception { + public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); @@ -765,7 +765,7 @@ public class StreamTaskTest { } @Test - public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() throws Exception { + public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, null, time, producer); @@ -775,7 +775,7 @@ public class StreamTaskTest { } @Test - public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() throws Exception { + public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); @@ -791,7 +791,7 @@ public class StreamTaskTest { } @Test - public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() throws Exception { + public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); @@ -802,7 +802,7 @@ public class StreamTaskTest { } @Test - public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() throws Exception { + public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, null, time, producer); @@ -818,7 +818,7 @@ public class StreamTaskTest { } @Test - public void shouldStartNewTransactionOnResumeIfEosEnabled() throws Exception { + public void shouldStartNewTransactionOnResumeIfEosEnabled() { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); @@ -833,7 +833,7 @@ public class StreamTaskTest { } @Test - public void shouldNotStartNewTransactionOnResumeIfEosDisabled() throws Exception { + public void shouldNotStartNewTransactionOnResumeIfEosDisabled() { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, null, time, producer); @@ -848,7 +848,7 @@ public class StreamTaskTest { } @Test - public void shouldStartNewTransactionOnCommitIfEosEnabled() throws Exception { + public void shouldStartNewTransactionOnCommitIfEosEnabled() { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); @@ -862,7 +862,7 @@ public class StreamTaskTest { } @Test - public void shouldNotStartNewTransactionOnCommitIfEosDisabled() throws Exception { + public void shouldNotStartNewTransactionOnCommitIfEosDisabled() { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, null, time, producer); @@ -876,7 +876,7 @@ public class StreamTaskTest { } @Test - public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() throws Exception { + public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer); @@ -898,7 +898,7 @@ public class StreamTaskTest { } @Test - public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() throws Exception { + public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, null, time, producer); @@ -909,7 +909,7 @@ public class StreamTaskTest { @SuppressWarnings("unchecked") @Test - public void shouldCloseProducerOnCloseWhenEosEnabled() throws Exception { + public void shouldCloseProducerOnCloseWhenEosEnabled() { final MockProducer producer = new MockProducer(); task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index a7923f8..fd9b19d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -90,7 +90,7 @@ public class StreamThreadTest { private final ConsumedInternal consumed = new ConsumedInternal<>(); @Before - public void setUp() throws Exception { + public void setUp() { processId = UUID.randomUUID(); internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(internalStreamsBuilder); @@ -129,7 +129,7 @@ public class StreamThreadTest { @SuppressWarnings("unchecked") @Test - public void testPartitionAssignmentChangeForSingleGroup() throws Exception { + public void testPartitionAssignmentChangeForSingleGroup() { internalTopologyBuilder.addSource(null, "source1", null, null, null, "topic1"); final StreamThread thread = getStreamThread(); @@ -223,7 +223,7 @@ public class StreamThreadTest { @SuppressWarnings("unchecked") @Test - public void testPartitionAssignmentChangeForMultipleGroups() throws Exception { + public void testPartitionAssignmentChangeForMultipleGroups() { internalTopologyBuilder.addSource(null, "source1", null, null, null, "topic1"); internalTopologyBuilder.addSource(null, "source2", null, null, null, "topic2"); internalTopologyBuilder.addSource(null, "source3", null, null, null, "topic3"); @@ -744,7 +744,7 @@ public class StreamThreadTest { } @Test - public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception { + public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() { internalStreamsBuilder.stream(Collections.singleton("t1"), consumed).groupByKey().count("count-one"); internalStreamsBuilder.stream(Collections.singleton("t2"), consumed).groupByKey().count("count-two"); @@ -810,7 +810,7 @@ public class StreamThreadTest { } @Test - public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws Exception { + public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws InterruptedException { internalTopologyBuilder.addSource(null, "source", null, null, null, TOPIC); internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, "source"); @@ -871,7 +871,7 @@ public class StreamThreadTest { } @Test - public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() throws Exception { + public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() { internalTopologyBuilder.addSource(null, "name", null, null, null, "topic"); internalTopologyBuilder.addSink("out", "output", null, null, null); http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java index ccba038..b22c488 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java @@ -127,12 +127,12 @@ public class StreamsMetadataStateTest { } @Test - public void shouldNotThrowNPEWhenOnChangeNotCalled() throws Exception { + public void shouldNotThrowNPEWhenOnChangeNotCalled() { new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), hostOne).getAllMetadataForStore("store"); } @Test - public void shouldGetAllStreamInstances() throws Exception { + public void shouldGetAllStreamInstances() { final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet(globalTable, "table-one", "table-two", "merged-table"), Utils.mkSet(topic1P0, topic2P1, topic4P0)); final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet(globalTable, "table-two", "table-one", "merged-table"), @@ -148,7 +148,7 @@ public class StreamsMetadataStateTest { } @Test - public void shouldGetAllStreamsInstancesWithNoStores() throws Exception { + public void shouldGetAllStreamsInstancesWithNoStores() { builder.stream("topic-five").filter(new Predicate() { @Override public boolean test(final Object key, final Object value) { @@ -169,7 +169,7 @@ public class StreamsMetadataStateTest { } @Test - public void shouldGetInstancesForStoreName() throws Exception { + public void shouldGetInstancesForStoreName() { final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet(globalTable, "table-one", "table-two", "merged-table"), Utils.mkSet(topic1P0, topic2P1, topic4P0)); final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet(globalTable, "table-two", "table-one", "merged-table"), @@ -181,18 +181,18 @@ public class StreamsMetadataStateTest { } @Test(expected = NullPointerException.class) - public void shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore() throws Exception { + public void shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore() { discovery.getAllMetadataForStore(null); } @Test - public void shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist() throws Exception { + public void shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist() { final Collection actual = discovery.getAllMetadataForStore("not-a-store"); assertTrue(actual.isEmpty()); } @Test - public void shouldGetInstanceWithKey() throws Exception { + public void shouldGetInstanceWithKey() { final TopicPartition tp4 = new TopicPartition("topic-three", 1); hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4)); @@ -209,7 +209,7 @@ public class StreamsMetadataStateTest { } @Test - public void shouldGetInstanceWithKeyAndCustomPartitioner() throws Exception { + public void shouldGetInstanceWithKeyAndCustomPartitioner() { final TopicPartition tp4 = new TopicPartition("topic-three", 1); hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4)); @@ -223,14 +223,14 @@ public class StreamsMetadataStateTest { } @Test - public void shouldReturnNotAvailableWhenClusterIsEmpty() throws Exception { + public void shouldReturnNotAvailableWhenClusterIsEmpty() { discovery.onChange(Collections.>emptyMap(), Cluster.empty()); final StreamsMetadata result = discovery.getMetadataWithKey("table-one", "a", Serdes.String().serializer()); assertEquals(StreamsMetadata.NOT_AVAILABLE, result); } @Test - public void shouldGetInstanceWithKeyWithMergedStreams() throws Exception { + public void shouldGetInstanceWithKeyWithMergedStreams() { final TopicPartition topic2P2 = new TopicPartition("topic-two", 2); hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1, topic2P2)); discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null)))); @@ -250,7 +250,7 @@ public class StreamsMetadataStateTest { } @Test - public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() throws Exception { + public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() { final StreamsMetadata actual = discovery.getMetadataWithKey("not-a-store", "key", Serdes.String().serializer()); @@ -258,28 +258,28 @@ public class StreamsMetadataStateTest { } @Test(expected = NullPointerException.class) - public void shouldThrowWhenKeyIsNull() throws Exception { + public void shouldThrowWhenKeyIsNull() { discovery.getMetadataWithKey("table-three", null, Serdes.String().serializer()); } @Test(expected = NullPointerException.class) - public void shouldThrowWhenSerializerIsNull() throws Exception { + public void shouldThrowWhenSerializerIsNull() { discovery.getMetadataWithKey("table-three", "key", (Serializer) null); } @Test(expected = NullPointerException.class) - public void shouldThrowIfStoreNameIsNull() throws Exception { + public void shouldThrowIfStoreNameIsNull() { discovery.getMetadataWithKey(null, "key", Serdes.String().serializer()); } @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) - public void shouldThrowIfStreamPartitionerIsNull() throws Exception { + public void shouldThrowIfStreamPartitionerIsNull() { discovery.getMetadataWithKey(null, "key", (StreamPartitioner) null); } @Test - public void shouldHaveGlobalStoreInAllMetadata() throws Exception { + public void shouldHaveGlobalStoreInAllMetadata() { final Collection metadata = discovery.getAllMetadataForStore(globalTable); assertEquals(3, metadata.size()); for (StreamsMetadata streamsMetadata : metadata) { @@ -288,26 +288,26 @@ public class StreamsMetadataStateTest { } @Test - public void shouldGetMyMetadataForGlobalStoreWithKey() throws Exception { + public void shouldGetMyMetadataForGlobalStoreWithKey() { final StreamsMetadata metadata = discovery.getMetadataWithKey(globalTable, "key", Serdes.String().serializer()); assertEquals(hostOne, metadata.hostInfo()); } @Test - public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() throws Exception { + public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() { final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), StreamsMetadataState.UNKNOWN_HOST); streamsMetadataState.onChange(hostToPartitions, cluster); assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", Serdes.String().serializer())); } @Test - public void shouldGetMyMetadataForGlobalStoreWithKeyAndPartitioner() throws Exception { + public void shouldGetMyMetadataForGlobalStoreWithKeyAndPartitioner() { final StreamsMetadata metadata = discovery.getMetadataWithKey(globalTable, "key", partitioner); assertEquals(hostOne, metadata.hostInfo()); } @Test - public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() throws Exception { + public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() { final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), StreamsMetadataState.UNKNOWN_HOST); streamsMetadataState.onChange(hostToPartitions, cluster); assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", partitioner)); http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java index 0e87a6d..215d4f1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java @@ -31,7 +31,7 @@ import static org.junit.Assert.assertEquals; public class StreamsMetricsImplTest { @Test(expected = NullPointerException.class) - public void testNullMetrics() throws Exception { + public void testNullMetrics() { String groupName = "doesNotMatter"; Map tags = new HashMap<>(); StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(null, groupName, tags); http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java index 9473a40..ec94ad8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java @@ -52,7 +52,7 @@ public class AssignmentInfoTest { } @Test - public void shouldDecodePreviousVersion() throws Exception { + public void shouldDecodePreviousVersion() throws IOException { List activeTasks = Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0)); Map> standbyTasks = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java index d0743f1..034ae7b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java @@ -32,19 +32,19 @@ public class ClientStateTest { private final ClientState client = new ClientState(1); @Test - public void shouldHaveNotReachedCapacityWhenAssignedTasksLessThanCapacity() throws Exception { + public void shouldHaveNotReachedCapacityWhenAssignedTasksLessThanCapacity() { assertFalse(client.reachedCapacity()); } @Test - public void shouldHaveReachedCapacityWhenAssignedTasksGreaterThanOrEqualToCapacity() throws Exception { + public void shouldHaveReachedCapacityWhenAssignedTasksGreaterThanOrEqualToCapacity() { client.assign(new TaskId(0, 1), true); assertTrue(client.reachedCapacity()); } @Test - public void shouldAddActiveTasksToBothAssignedAndActive() throws Exception { + public void shouldAddActiveTasksToBothAssignedAndActive() { final TaskId tid = new TaskId(0, 1); client.assign(tid, true); @@ -55,7 +55,7 @@ public class ClientStateTest { } @Test - public void shouldAddStandbyTasksToBothStandbyAndActive() throws Exception { + public void shouldAddStandbyTasksToBothStandbyAndActive() { final TaskId tid = new TaskId(0, 1); client.assign(tid, false); @@ -66,7 +66,7 @@ public class ClientStateTest { } @Test - public void shouldAddPreviousActiveTasksToPreviousAssignedAndPreviousActive() throws Exception { + public void shouldAddPreviousActiveTasksToPreviousAssignedAndPreviousActive() { final TaskId tid1 = new TaskId(0, 1); final TaskId tid2 = new TaskId(0, 2); @@ -76,7 +76,7 @@ public class ClientStateTest { } @Test - public void shouldAddPreviousStandbyTasksToPreviousAssigned() throws Exception { + public void shouldAddPreviousStandbyTasksToPreviousAssigned() { final TaskId tid1 = new TaskId(0, 1); final TaskId tid2 = new TaskId(0, 2); @@ -86,7 +86,7 @@ public class ClientStateTest { } @Test - public void shouldHaveAssignedTaskIfActiveTaskAssigned() throws Exception { + public void shouldHaveAssignedTaskIfActiveTaskAssigned() { final TaskId tid = new TaskId(0, 2); client.assign(tid, true); @@ -94,7 +94,7 @@ public class ClientStateTest { } @Test - public void shouldHaveAssignedTaskIfStandbyTaskAssigned() throws Exception { + public void shouldHaveAssignedTaskIfStandbyTaskAssigned() { final TaskId tid = new TaskId(0, 2); client.assign(tid, false); @@ -102,14 +102,14 @@ public class ClientStateTest { } @Test - public void shouldNotHaveAssignedTaskIfTaskNotAssigned() throws Exception { + public void shouldNotHaveAssignedTaskIfTaskNotAssigned() { client.assign(new TaskId(0, 2), true); assertFalse(client.hasAssignedTask(new TaskId(0, 3))); } @Test - public void shouldHaveMoreAvailableCapacityWhenCapacityTheSameButFewerAssignedTasks() throws Exception { + public void shouldHaveMoreAvailableCapacityWhenCapacityTheSameButFewerAssignedTasks() { final ClientState c2 = new ClientState(1); client.assign(new TaskId(0, 1), true); assertTrue(c2.hasMoreAvailableCapacityThan(client)); @@ -117,14 +117,14 @@ public class ClientStateTest { } @Test - public void shouldHaveMoreAvailableCapacityWhenCapacityHigherAndSameAssignedTaskCount() throws Exception { + public void shouldHaveMoreAvailableCapacityWhenCapacityHigherAndSameAssignedTaskCount() { final ClientState c2 = new ClientState(2); assertTrue(c2.hasMoreAvailableCapacityThan(client)); assertFalse(client.hasMoreAvailableCapacityThan(c2)); } @Test - public void shouldUseMultiplesOfCapacityToDetermineClientWithMoreAvailableCapacity() throws Exception { + public void shouldUseMultiplesOfCapacityToDetermineClientWithMoreAvailableCapacity() { final ClientState c2 = new ClientState(2); for (int i = 0; i < 7; i++) { @@ -139,7 +139,7 @@ public class ClientStateTest { } @Test - public void shouldHaveMoreAvailableCapacityWhenCapacityIsTheSameButAssignedTasksIsLess() throws Exception { + public void shouldHaveMoreAvailableCapacityWhenCapacityIsTheSameButAssignedTasksIsLess() { final ClientState c1 = new ClientState(3); final ClientState c2 = new ClientState(3); for (int i = 0; i < 4; i++) { @@ -151,26 +151,26 @@ public class ClientStateTest { } @Test(expected = IllegalStateException.class) - public void shouldThrowIllegalStateExceptionIfCapacityOfThisClientStateIsZero() throws Exception { + public void shouldThrowIllegalStateExceptionIfCapacityOfThisClientStateIsZero() { final ClientState c1 = new ClientState(0); c1.hasMoreAvailableCapacityThan(new ClientState(1)); } @Test(expected = IllegalStateException.class) - public void shouldThrowIllegalStateExceptionIfCapacityOfOtherClientStateIsZero() throws Exception { + public void shouldThrowIllegalStateExceptionIfCapacityOfOtherClientStateIsZero() { final ClientState c1 = new ClientState(1); c1.hasMoreAvailableCapacityThan(new ClientState(0)); } @Test - public void shouldHaveUnfulfilledQuotaWhenActiveTaskSizeLessThanCapacityTimesTasksPerThread() throws Exception { + public void shouldHaveUnfulfilledQuotaWhenActiveTaskSizeLessThanCapacityTimesTasksPerThread() { final ClientState client = new ClientState(1); client.assign(new TaskId(0, 1), true); assertTrue(client.hasUnfulfilledQuota(2)); } @Test - public void shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapacityTimesTasksPerThread() throws Exception { + public void shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapacityTimesTasksPerThread() { final ClientState client = new ClientState(1); client.assign(new TaskId(0, 1), true); assertFalse(client.hasUnfulfilledQuota(1)); http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java index 449dabd..86af0be 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java @@ -51,7 +51,7 @@ public class StickyTaskAssignorTest { private final Integer p4 = 4; @Test - public void shouldAssignOneActiveTaskToEachProcessWhenTaskCountSameAsProcessCount() throws Exception { + public void shouldAssignOneActiveTaskToEachProcessWhenTaskCountSameAsProcessCount() { createClient(p1, 1); createClient(p2, 1); createClient(p3, 1); @@ -65,7 +65,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldNotMigrateActiveTaskToOtherProcess() throws Exception { + public void shouldNotMigrateActiveTaskToOtherProcess() { createClientWithPreviousActiveTasks(p1, 1, task00); createClientWithPreviousActiveTasks(p2, 1, task01); @@ -91,7 +91,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldMigrateActiveTasksToNewProcessWithoutChangingAllAssignments() throws Exception { + public void shouldMigrateActiveTasksToNewProcessWithoutChangingAllAssignments() { createClientWithPreviousActiveTasks(p1, 1, task00, task02); createClientWithPreviousActiveTasks(p2, 1, task01); createClient(p3, 1); @@ -107,7 +107,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldAssignBasedOnCapacity() throws Exception { + public void shouldAssignBasedOnCapacity() { createClient(p1, 1); createClient(p2, 2); final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02); @@ -152,7 +152,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldAssignTasksToClientWithPreviousStandbyTasks() throws Exception { + public void shouldAssignTasksToClientWithPreviousStandbyTasks() { final ClientState client1 = createClient(p1, 1); client1.addPreviousStandbyTasks(Utils.mkSet(task02)); final ClientState client2 = createClient(p2, 1); @@ -170,7 +170,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() throws Exception { + public void shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() { final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00); c1.addPreviousStandbyTasks(Utils.mkSet(task01)); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 2, task02); @@ -185,7 +185,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldAssignStandbyTasksToDifferentClientThanCorrespondingActiveTaskIsAssingedTo() throws Exception { + public void shouldAssignStandbyTasksToDifferentClientThanCorrespondingActiveTaskIsAssingedTo() { createClientWithPreviousActiveTasks(p1, 1, task00); createClientWithPreviousActiveTasks(p2, 1, task01); createClientWithPreviousActiveTasks(p3, 1, task02); @@ -215,7 +215,7 @@ public class StickyTaskAssignorTest { @Test - public void shouldAssignMultipleReplicasOfStandbyTask() throws Exception { + public void shouldAssignMultipleReplicasOfStandbyTask() { createClientWithPreviousActiveTasks(p1, 1, task00); createClientWithPreviousActiveTasks(p2, 1, task01); createClientWithPreviousActiveTasks(p3, 1, task02); @@ -229,7 +229,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldNotAssignStandbyTaskReplicasWhenNoClientAvailableWithoutHavingTheTaskAssigned() throws Exception { + public void shouldNotAssignStandbyTaskReplicasWhenNoClientAvailableWithoutHavingTheTaskAssigned() { createClient(p1, 1); final StickyTaskAssignor taskAssignor = createTaskAssignor(task00); taskAssignor.assign(1); @@ -237,7 +237,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldAssignActiveAndStandbyTasks() throws Exception { + public void shouldAssignActiveAndStandbyTasks() { createClient(p1, 1); createClient(p2, 1); createClient(p3, 1); @@ -251,7 +251,7 @@ public class StickyTaskAssignorTest { @Test - public void shouldAssignAtLeastOneTaskToEachClientIfPossible() throws Exception { + public void shouldAssignAtLeastOneTaskToEachClientIfPossible() { createClient(p1, 3); createClient(p2, 1); createClient(p3, 1); @@ -264,7 +264,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldAssignEachActiveTaskToOneClientWhenMoreClientsThanTasks() throws Exception { + public void shouldAssignEachActiveTaskToOneClientWhenMoreClientsThanTasks() { createClient(p1, 1); createClient(p2, 1); createClient(p3, 1); @@ -279,7 +279,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldBalanceActiveAndStandbyTasksAcrossAvailableClients() throws Exception { + public void shouldBalanceActiveAndStandbyTasksAcrossAvailableClients() { createClient(p1, 1); createClient(p2, 1); createClient(p3, 1); @@ -296,7 +296,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldAssignMoreTasksToClientWithMoreCapacity() throws Exception { + public void shouldAssignMoreTasksToClientWithMoreCapacity() { createClient(p2, 2); createClient(p1, 1); @@ -320,7 +320,7 @@ public class StickyTaskAssignorTest { @Test - public void shouldNotHaveSameAssignmentOnAnyTwoHosts() throws Exception { + public void shouldNotHaveSameAssignmentOnAnyTwoHosts() { createClient(p1, 1); createClient(p2, 1); createClient(p3, 1); @@ -342,7 +342,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks() throws Exception { + public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks() { createClientWithPreviousActiveTasks(p1, 1, task01, task02); createClientWithPreviousActiveTasks(p2, 1, task03); createClientWithPreviousActiveTasks(p3, 1, task00); @@ -364,7 +364,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() throws Exception { + public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() { final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task02); c1.addPreviousStandbyTasks(Utils.mkSet(task03, task00)); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task03, task00); @@ -389,7 +389,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldReBalanceTasksAcrossAllClientsWhenCapacityAndTaskCountTheSame() throws Exception { + public void shouldReBalanceTasksAcrossAllClientsWhenCapacityAndTaskCountTheSame() { createClientWithPreviousActiveTasks(p3, 1, task00, task01, task02, task03); createClient(p1, 1); createClient(p2, 1); @@ -405,7 +405,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldReBalanceTasksAcrossClientsWhenCapacityLessThanTaskCount() throws Exception { + public void shouldReBalanceTasksAcrossClientsWhenCapacityLessThanTaskCount() { createClientWithPreviousActiveTasks(p3, 1, task00, task01, task02, task03); createClient(p1, 1); createClient(p2, 1); @@ -419,7 +419,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldRebalanceTasksToClientsBasedOnCapacity() throws Exception { + public void shouldRebalanceTasksToClientsBasedOnCapacity() { createClientWithPreviousActiveTasks(p2, 1, task00, task03, task02); createClient(p3, 2); final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task02, task03); @@ -429,7 +429,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldMoveMinimalNumberOfTasksWhenPreviouslyAboveCapacityAndNewClientAdded() throws Exception { + public void shouldMoveMinimalNumberOfTasksWhenPreviouslyAboveCapacityAndNewClientAdded() { final Set p1PrevTasks = Utils.mkSet(task00, task02); final Set p2PrevTasks = Utils.mkSet(task01, task03); @@ -450,7 +450,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldNotMoveAnyTasksWhenNewTasksAdded() throws Exception { + public void shouldNotMoveAnyTasksWhenNewTasksAdded() { createClientWithPreviousActiveTasks(p1, 1, task00, task01); createClientWithPreviousActiveTasks(p2, 1, task02, task03); @@ -462,7 +462,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldAssignNewTasksToNewClientWhenPreviousTasksAssignedToOldClients() throws Exception { + public void shouldAssignNewTasksToNewClientWhenPreviousTasksAssignedToOldClients() { createClientWithPreviousActiveTasks(p1, 1, task02, task01); createClientWithPreviousActiveTasks(p2, 1, task00, task03); @@ -477,7 +477,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldAssignTasksNotPreviouslyActiveToNewClient() throws Exception { + public void shouldAssignTasksNotPreviouslyActiveToNewClient() { final TaskId task10 = new TaskId(0, 10); final TaskId task11 = new TaskId(0, 11); final TaskId task12 = new TaskId(1, 2); @@ -507,7 +507,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldAssignTasksNotPreviouslyActiveToMultipleNewClients() throws Exception { + public void shouldAssignTasksNotPreviouslyActiveToMultipleNewClients() { final TaskId task10 = new TaskId(0, 10); final TaskId task11 = new TaskId(0, 11); final TaskId task12 = new TaskId(1, 2); @@ -538,7 +538,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldAssignTasksToNewClient() throws Exception { + public void shouldAssignTasksToNewClient() { createClientWithPreviousActiveTasks(p1, 1, task01, task02); createClient(p2, 1); createTaskAssignor(task01, task02).assign(0); @@ -546,7 +546,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients() throws Exception { + public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingClients() { final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task03, task04, task05); final ClientState newClient = createClient(p3, 1); @@ -565,7 +565,7 @@ public class StickyTaskAssignorTest { } @Test - public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingAndBouncedClients() throws Exception { + public void shouldAssignTasksToNewClientWithoutFlippingAssignmentBetweenExistingAndBouncedClients() { final TaskId task06 = new TaskId(0, 6); final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02, task06); final ClientState c2 = createClient(p2, 1);