kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Add code quality checks (and suppressions) to checkstyle.xml
Date Tue, 28 Feb 2017 23:02:16 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a10d7b1b7 -> de05c9d3a


MINOR: Add code quality checks (and suppressions) to checkstyle.xml

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ewen Cheslack-Postava <me@ewencp.org>,
Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #2594 from dguy/checkstyle


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/de05c9d3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/de05c9d3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/de05c9d3

Branch: refs/heads/trunk
Commit: de05c9d3a0d79a555088afe9344b52e002d287f2
Parents: a10d7b1
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Feb 28 22:55:46 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Tue Feb 28 22:57:57 2017 +0000

----------------------------------------------------------------------
 build.gradle                                    |   5 +-
 checkstyle/checkstyle.xml                       |  36 +++
 checkstyle/suppressions.xml                     | 229 +++++++++++++++++++
 .../kafka/common/protocol/types/Struct.java     |   2 +-
 .../SaslClientCallbackHandler.java              |   2 +-
 .../clients/producer/ProducerRecordTest.java    |  12 +-
 .../org/apache/kafka/streams/StreamsConfig.java |   6 +-
 .../kstream/internals/SessionKeySerde.java      |   6 +-
 .../streams/processor/TopologyBuilder.java      |   4 +-
 .../KStreamAggregationDedupIntegrationTest.java |  12 +-
 .../KStreamAggregationIntegrationTest.java      |  24 +-
 .../internals/StreamsMetadataStateTest.java     |   7 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   2 +-
 13 files changed, 307 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 417383d..fa2ed96 100644
--- a/build.gradle
+++ b/build.gradle
@@ -109,7 +109,8 @@ if (new File('.git').exists()) {
         'gradlew.bat',
         '**/README.md',
         '**/id_rsa',
-        '**/id_rsa.pub'
+        '**/id_rsa.pub',
+        'checkstyle/suppressions.xml'
     ])
   }
 }
@@ -272,6 +273,8 @@ subprojects {
   checkstyle {
     configFile = new File(rootDir, "checkstyle/checkstyle.xml")
     configProperties = [importControlFile: "$rootDir/checkstyle/import-control.xml"]
+    // version 7.x requires Java 8
+    toolVersion = '6.19'
   }
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 51b613d..9a4a37f 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -93,5 +93,41 @@
       <property name="illegalPattern" value="true"/>
       <property name="ignoreComments" value="true"/>
     </module>
+
+    <!-- code quality -->
+    <module name="MethodLength"/>
+    <module name="ParameterNumber">
+      <!-- default is 8 -->
+      <property name="max" value="10"/>
+    </module>
+    <module name="ClassDataAbstractionCoupling">
+      <!-- default is 7 -->
+      <property name="max" value="15"/>
+    </module>
+    <module name="BooleanExpressionComplexity">
+      <!-- default is 3 -->
+      <property name="max" value="4"/>
+    </module>
+
+    <module name="ClassFanOutComplexity">
+      <!-- default is 20 -->
+      <property name="max" value="35"/>
+    </module>
+    <module name="CyclomaticComplexity">
+      <!-- default is 10-->
+      <property name="max" value="15"/>
+    </module>
+    <module name="JavaNCSS">
+      <!-- default is 50 -->
+      <property name="methodMaximum" value="100"/>
+    </module>
+    <module name="NPathComplexity">
+      <!-- default is 200 -->
+      <property name="max" value="500"/>
+    </module>
+  </module>
+
+  <module name="SuppressionFilter">
+    <property name="file" value="checkstyle/suppressions.xml"/>
   </module>
 </module>

http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
new file mode 100644
index 0000000..9f13307
--- /dev/null
+++ b/checkstyle/suppressions.xml
@@ -0,0 +1,229 @@
+<?xml version="1.0"?>
+
+<!DOCTYPE suppressions PUBLIC
+        "-//Puppy Crawl//DTD Suppressions 1.1//EN"
+        "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
+
+<suppressions>
+
+    <!-- Clients -->
+    <suppress checks="ClassFanOutComplexity"
+              files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator).java"/>
+    <suppress checks="ClassFanOutComplexity"
+              files=".*/protocol/Errors.java"/>
+
+    <suppress checks="MethodLength"
+              files="KerberosLogin.java"/>
+
+    <suppress checks="ParameterNumber"
+              files="NetworkClient.java"/>
+    <suppress checks="ParameterNumber"
+              files="KafkaConsumer.java"/>
+    <suppress checks="ParameterNumber"
+              files="ConsumerCoordinator.java"/>
+    <suppress checks="ParameterNumber"
+              files="Fetcher.java"/>
+    <suppress checks="ParameterNumber"
+              files="ConfigDef.java"/>
+
+    <suppress checks="ClassDataAbstractionCoupling"
+              files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse).java"/>
+    <suppress checks="ClassDataAbstractionCoupling"
+              files=".*/protocol/Errors.java"/>
+
+    <suppress checks="BooleanExpressionComplexity"
+              files="KafkaLZ4BlockOutputStream.java"/>
+
+    <suppress checks="CyclomaticComplexity"
+              files="ConsumerCoordinator.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="Fetcher.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="KafkaProducer.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="BufferPool.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="RecordAccumulator.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="ConfigDef.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="Selector.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="SslTransportLayer.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="KerberosLogin.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="AbstractRequest.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="AbstractResponse.java"/>
+
+    <suppress checks="JavaNCSS"
+              files="KerberosLogin.java"/>
+
+    <suppress checks="NPathComplexity"
+              files="BufferPool.java"/>
+    <suppress checks="NPathComplexity"
+              files="MetricName.java"/>
+    <suppress checks="NPathComplexity"
+              files="Node.java"/>
+    <suppress checks="NPathComplexity"
+              files="ConfigDef.java"/>
+    <suppress checks="NPathComplexity"
+              files="SslTransportLayer.java"/>
+    <suppress checks="NPathComplexity"
+              files="MetadataResponse.java"/>
+    <suppress checks="NPathComplexity"
+              files="KerberosLogin.java"/>
+    <suppress checks="NPathComplexity"
+              files="SslTransportLayer.java"/>
+
+    <!-- clients tests -->
+    <suppress checks="ClassDataAbstractionCoupling"
+              files="(Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse)Test.java"/>
+
+    <suppress checks="ClassFanOutComplexity"
+              files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher)Test.java"/>
+
+    <!-- Connect -->
+    <suppress checks="ClassFanOutComplexity"
+              files="DistributedHerder.java"/>
+
+    <suppress checks="MethodLength"
+              files="KafkaConfigBackingStore.java"/>
+
+    <suppress checks="ParameterNumber"
+              files="WorkerSourceTask.java"/>
+    <suppress checks="ParameterNumber"
+              files="WorkerCoordinator.java"/>
+    <suppress checks="ParameterNumber"
+              files="ConfigKeyInfo.java"/>
+
+    <suppress checks="ClassDataAbstractionCoupling"
+              files="(RestServer|AbstractHerder|DistributedHerder).java"/>
+
+    <suppress checks="BooleanExpressionComplexity"
+              files="JsonConverter.java"/>
+
+    <suppress checks="CyclomaticComplexity"
+              files="ConnectRecord.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="JsonConverter.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="FileStreamSourceTask.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="DistributedHerder.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="KafkaConfigBackingStore.java"/>
+
+    <suppress checks="JavaNCSS"
+              files="KafkaConfigBackingStore.java"/>
+
+    <suppress checks="NPathComplexity"
+              files="ConnectRecord.java"/>
+    <suppress checks="NPathComplexity"
+              files="ConnectSchema.java"/>
+    <suppress checks="NPathComplexity"
+              files="FileStreamSourceTask.java"/>
+    <suppress checks="NPathComplexity"
+              files="JsonConverter.java"/>
+    <suppress checks="NPathComplexity"
+              files="DistributedHerder.java"/>
+
+    <!-- connect tests-->
+    <suppress checks="ClassDataAbstractionCoupling"
+              files="(DistributedHerder|KafkaBasedLog)Test.java"/>
+
+    <!-- Streams -->
+    <suppress checks="ClassFanOutComplexity"
+              files="(KafkaStreams|KStreamImpl|KTableImpl|StreamThread).java"/>
+    <suppress checks="ClassFanOutComplexity"
+              files="KStreamImpl.java"/>
+    <suppress checks="ClassFanOutComplexity"
+              files="KTableImpl.java"/>
+    <suppress checks="ClassFanOutComplexity"
+              files="StreamThread.java"/>
+
+    <suppress checks="MethodLength"
+              files="StreamPartitionAssignor.java"/>
+
+    <suppress checks="ParameterNumber"
+              files="StreamTask.java"/>
+    <suppress checks="ParameterNumber"
+              files="RocksDBWindowStoreSupplier.java"/>
+
+    <suppress checks="ClassDataAbstractionCoupling"
+              files="(KStreamImpl|StreamPartitionAssignor|KafkaStreams|KTableImpl).java"/>
+
+    <suppress checks="CyclomaticComplexity"
+              files="TopologyBuilder.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="StreamPartitionAssignor.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="StreamThread.java"/>
+
+    <suppress checks="JavaNCSS"
+              files="StreamPartitionAssignor.java"/>
+
+    <suppress checks="NPathComplexity"
+              files="ProcessorStateManager.java"/>
+    <suppress checks="NPathComplexity"
+              files="StreamPartitionAssignor.java"/>
+    <suppress checks="NPathComplexity"
+              files="StreamThread.java"/>
+
+    <!-- streams tests -->
+    <suppress checks="ClassFanOutComplexity"
+              files="(StreamTaskTest|ProcessorTopologyTestDriver).java"/>
+
+    <suppress checks="MethodLength"
+              files="KStreamKTableJoinIntegrationTest.java"/>
+    <suppress checks="MethodLength"
+              files="KStreamKStreamJoinTest.java"/>
+    <suppress checks="MethodLength"
+              files="KStreamWindowAggregateTest.java"/>
+
+    <suppress checks="ClassDataAbstractionCoupling"
+              files=".*/streams/.*/Test.java"/>
+    <suppress checks="ClassDataAbstractionCoupling"
+              files=".*/streams/.*test/.*.java"/>
+
+    <suppress checks="BooleanExpressionComplexity"
+              files="SmokeTestDriver.java"/>
+
+    <suppress checks="CyclomaticComplexity"
+              files="KStreamKStreamJoinTest.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="SmokeTestDriver.java"/>
+
+    <suppress checks="JavaNCSS"
+              files="KStreamKStreamJoinTest.java"/>
+    <suppress checks="JavaNCSS"
+              files="SmokeTestDriver.java"/>
+
+    <suppress checks="NPathComplexity"
+              files="KStreamKStreamJoinTest.java"/>
+    <suppress checks="NPathComplexity"
+              files="KStreamKStreamLeftJoinTest.java"/>
+
+    <!-- Tools -->
+    <suppress checks="ClassDataAbstractionCoupling"
+              files="VerifiableConsumer.java"/>
+
+    <suppress checks="CyclomaticComplexity"
+              files="StreamsResetter.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="ProducerPerformance.java"/>
+
+    <suppress checks="NPathComplexity"
+              files="StreamsResetter.java"/>
+    <suppress checks="NPathComplexity"
+              files="ProducerPerformance.java"/>
+
+    <!-- Log4J-Appender -->
+    <suppress checks="CyclomaticComplexity"
+              files="KafkaLog4jAppender.java"/>
+
+    <suppress checks="NPathComplexity"
+              files="KafkaLog4jAppender.java"/>
+
+</suppressions>

http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 1258763..c32aea7 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -316,7 +316,7 @@ public class Struct {
             Field f = this.schema.get(i);
             if (f.type() instanceof ArrayOf) {
                 if (this.get(f) != null) {
-                    Object[] arrayObject = (Object []) this.get(f);
+                    Object[] arrayObject = (Object[]) this.get(f);
                     for (Object arrayItem: arrayObject)
                         result = prime * result + arrayItem.hashCode();
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index 3391ff3..7111bad 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -58,7 +58,7 @@ public class SaslClientCallbackHandler implements AuthCallbackHandler {
                     nc.setName(nc.getDefaultName());
             } else if (callback instanceof PasswordCallback) {
                 if (!isKerberos && subject != null && !subject.getPrivateCredentials(String.class).isEmpty())
{
-                    char [] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
+                    char[] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
                     ((PasswordCallback) callback).setPassword(password);
                 } else {
                     String errorMessage = "Could not login: the client is being asked for
a password, but the Kafka" +

http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
index 7d725fa..b5a7a60 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
@@ -26,24 +26,24 @@ public class ProducerRecordTest {
 
     @Test
     public void testEqualsAndHashCode() {
-        ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>("test",
1 , "key", 1);
+        ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>("test",
1, "key", 1);
         assertEquals(producerRecord, producerRecord);
         assertEquals(producerRecord.hashCode(), producerRecord.hashCode());
 
-        ProducerRecord<String, Integer> equalRecord = new ProducerRecord<>("test",
1 , "key", 1);
+        ProducerRecord<String, Integer> equalRecord = new ProducerRecord<>("test",
1, "key", 1);
         assertEquals(producerRecord, equalRecord);
         assertEquals(producerRecord.hashCode(), equalRecord.hashCode());
 
-        ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<>("test-1",
1 , "key", 1);
+        ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<>("test-1",
1, "key", 1);
         assertFalse(producerRecord.equals(topicMisMatch));
 
-        ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<>("test",
2 , "key", 1);
+        ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<>("test",
2, "key", 1);
         assertFalse(producerRecord.equals(partitionMismatch));
 
-        ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<>("test",
1 , "key-1", 1);
+        ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<>("test",
1, "key-1", 1);
         assertFalse(producerRecord.equals(keyMisMatch));
 
-        ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<>("test",
1 , "key", 2);
+        ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<>("test",
1, "key", 2);
         assertFalse(producerRecord.equals(valueMisMatch));
 
         ProducerRecord<String, Integer> nullFieldsRecord = new ProducerRecord<>("topic",
null, null, null, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 8c81318..0eb3f7b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -388,8 +388,7 @@ public class StreamsConfig extends AbstractConfig {
     // this is the list of configs for underlying clients
     // that streams prefer different default values
     private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
-    static
-    {
+    static {
         final Map<String, Object> tempProducerDefaultOverrides = new HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100");
 
@@ -397,8 +396,7 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
-    static
-    {
+    static {
         final Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
         tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
         tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
index d4757ab..7eb8300 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionKeySerde.java
@@ -112,15 +112,15 @@ public class SessionKeySerde<K> implements Serde<Windowed<K>>
{
         }
     }
 
-    public static long extractEnd(final byte [] binaryKey) {
+    public static long extractEnd(final byte[] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
     }
 
-    public static long extractStart(final byte [] binaryKey) {
+    public static long extractStart(final byte[] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE);
     }
 
-    public static Window extractWindow(final byte [] binaryKey) {
+    public static Window extractWindow(final byte[] binaryKey) {
         final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
         final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
         final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);

http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 827a152..99f5d65 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -294,7 +294,7 @@ public class TopologyBuilder {
      * Enum used to define auto offset reset policy when creating {@link KStream} or {@link
KTable}
      */
     public enum AutoOffsetReset {
-        EARLIEST , LATEST
+        EARLIEST, LATEST
     }
 
     /**
@@ -864,7 +864,7 @@ public class TopologyBuilder {
         }
     }
 
-    private Set<String> findSourceTopicsForProcessorParents(String [] parents) {
+    private Set<String> findSourceTopicsForProcessorParents(String[] parents) {
         final Set<String> sourceTopics = new HashSet<>();
         for (String parent : parents) {
             NodeFactory nodeFactory = nodeFactories.get(parent);

http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 039be44..dcaa222 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -131,8 +131,8 @@ public class KStreamAggregationDedupIntegrationTest {
 
         List<KeyValue<String, String>> results = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 5);
+            new StringDeserializer(),
+            5);
 
         Collections.sort(results, new Comparator<KeyValue<String, String>>()
{
             @Override
@@ -181,8 +181,8 @@ public class KStreamAggregationDedupIntegrationTest {
 
         List<KeyValue<String, String>> windowedOutput = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 10);
+            new StringDeserializer(),
+            10);
 
         Comparator<KeyValue<String, String>>
             comparator =
@@ -233,8 +233,8 @@ public class KStreamAggregationDedupIntegrationTest {
 
         final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
-            new LongDeserializer()
-            , 5);
+            new LongDeserializer(),
+            5);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
             public int compare(final KeyValue<String, Long> o1, final KeyValue<String,
Long> o2) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 2551717..4eb582c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -155,8 +155,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, String>> results = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 10);
+            new StringDeserializer(),
+            10);
 
         Collections.sort(results, new Comparator<KeyValue<String, String>>()
{
             @Override
@@ -209,8 +209,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, String>> windowedOutput = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 15);
+            new StringDeserializer(),
+            15);
 
         final Comparator<KeyValue<String, String>>
             comparator =
@@ -263,8 +263,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, Integer>> results = receiveMessages(
             new StringDeserializer(),
-            new IntegerDeserializer()
-            , 10);
+            new IntegerDeserializer(),
+            10);
 
         Collections.sort(results, new Comparator<KeyValue<String, Integer>>()
{
             @Override
@@ -313,8 +313,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
             new StringDeserializer(),
-            new IntegerDeserializer()
-            , 15);
+            new IntegerDeserializer(),
+            15);
 
         final Comparator<KeyValue<String, Integer>>
             comparator =
@@ -364,8 +364,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
-            new LongDeserializer()
-            , 10);
+            new LongDeserializer(),
+            10);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
             public int compare(final KeyValue<String, Long> o1, final KeyValue<String,
Long> o2) {
@@ -406,8 +406,8 @@ public class KStreamAggregationIntegrationTest {
 
         final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
-            new LongDeserializer()
-            , 10);
+            new LongDeserializer(),
+            10);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
             public int compare(final KeyValue<String, Long> o1, final KeyValue<String,
Long> o2) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index c121d96..c8ab6f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -62,7 +62,7 @@ public class StreamsMetadataStateTest {
     private TopicPartition topic4P0;
     private List<PartitionInfo> partitionInfos;
     private Cluster cluster;
-    private final String globalTable = "global-table";;
+    private final String globalTable = "global-table";
     private StreamPartitioner<String, Object> partitioner;
 
     @Before
@@ -200,8 +200,9 @@ public class StreamsMetadataStateTest {
         final StreamsMetadata expected = new StreamsMetadata(hostThree, Utils.mkSet(globalTable,
"table-three"),
                 Collections.singleton(topic3P0));
 
-        final StreamsMetadata actual = discovery.getMetadataWithKey("table-three", "the-key",
-                Serdes.String().serializer());
+        final StreamsMetadata actual = discovery.getMetadataWithKey("table-three",
+                                                                    "the-key",
+                                                                    Serdes.String().serializer());
 
         assertEquals(expected, actual);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/de05c9d3/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 5808e9a..b704aa7 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -202,7 +202,7 @@ public class ProcessorTopologyTestDriver {
             final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer();
             for (final String topicName : globalTopology.sourceTopics()) {
                 List<PartitionInfo> partitionInfos = new ArrayList<>();
-                partitionInfos.add(new PartitionInfo(topicName , 1, null, null, null));
+                partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null));
                 globalConsumer.updatePartitions(topicName, partitionInfos);
                 final TopicPartition partition = new TopicPartition(topicName, 1);
                 globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));


Mime
View raw message