kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject [2/2] kafka git commit: KAFKA-2140 Improve code readability; reviewed by Neha Narkhede
Date Sun, 26 Apr 2015 15:40:59 GMT
KAFKA-2140 Improve code readability; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed1a548c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed1a548c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed1a548c

Branch: refs/heads/trunk
Commit: ed1a548c503f041c025e00e75d338b4fc4a51f47
Parents: 622e707
Author: Ismael Juma <mlists@juma.me.uk>
Authored: Sun Apr 26 08:40:49 2015 -0700
Committer: Neha Narkhede <nehanarkhede@apache.org>
Committed: Sun Apr 26 08:40:58 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  | 16 ++++-----
 .../kafka/clients/consumer/ConsumerRecords.java |  8 ++---
 .../kafka/clients/producer/ProducerConfig.java  |  2 +-
 .../kafka/common/metrics/stats/Histogram.java   |  2 +-
 .../kafka/common/protocol/SecurityProtocol.java |  2 +-
 .../kafka/common/config/AbstractConfigTest.java | 12 +++----
 .../types/ProtocolSerializationTest.java        |  4 +--
 .../main/java/kafka/etl/KafkaETLContext.java    |  2 +-
 .../kafka/admin/ReassignPartitionsCommand.scala |  2 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |  2 +-
 .../kafka/api/ControlledShutdownRequest.scala   |  8 ++---
 .../kafka/api/ControlledShutdownResponse.scala  |  8 ++---
 .../kafka/api/GenericRequestAndHeader.scala     |  4 +--
 .../kafka/api/GenericResponseAndHeader.scala    |  4 +--
 .../scala/kafka/api/LeaderAndIsrRequest.scala   |  6 ++--
 .../scala/kafka/api/StopReplicaResponse.scala   |  8 ++---
 .../main/scala/kafka/api/TopicMetadata.scala    |  2 +-
 .../scala/kafka/api/TopicMetadataRequest.scala  | 10 +++---
 .../main/scala/kafka/client/ClientUtils.scala   |  2 +-
 .../src/main/scala/kafka/cluster/EndPoint.scala |  2 +-
 .../ConsumerReblanceFailedException.scala       | 26 ---------------
 .../controller/ControllerChannelManager.scala   |  4 +--
 .../kafka/controller/KafkaController.scala      |  4 +--
 .../kafka/controller/ReplicaStateMachine.scala  |  2 +-
 .../kafka/coordinator/DelayedRebalance.scala    |  3 +-
 .../main/scala/kafka/javaapi/Implicits.scala    |  5 ---
 .../javaapi/message/ByteBufferMessageSet.scala  |  6 ++--
 .../main/scala/kafka/log/CleanerConfig.scala    | 20 ++++++------
 core/src/main/scala/kafka/log/LogCleaner.scala  |  4 +--
 core/src/main/scala/kafka/log/LogConfig.scala   | 34 ++++++++++----------
 core/src/main/scala/kafka/log/OffsetIndex.scala |  4 +--
 core/src/main/scala/kafka/log/OffsetMap.scala   |  4 +--
 .../main/scala/kafka/log/OffsetPosition.scala   |  2 +-
 .../main/scala/kafka/message/MessageSet.scala   |  2 +-
 .../kafka/metrics/KafkaMetricsReporter.scala    |  2 +-
 .../main/scala/kafka/network/SocketServer.scala |  6 ++--
 .../scala/kafka/producer/KeyedMessage.scala     |  4 +--
 .../kafka/server/BrokerMetadataCheckpoint.scala |  2 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  2 +-
 .../scala/kafka/server/LogOffsetMetadata.scala  |  2 +-
 .../scala/kafka/server/ReplicaManager.scala     |  2 +-
 .../scala/kafka/tools/ConsoleConsumer.scala     |  4 +--
 .../scala/kafka/tools/ConsoleProducer.scala     |  4 +--
 .../scala/kafka/tools/ExportZkOffsets.scala     | 11 +++----
 .../scala/kafka/tools/ImportZkOffsets.scala     |  2 +-
 core/src/main/scala/kafka/tools/JmxTool.scala   |  6 ++--
 .../scala/kafka/tools/ProducerPerformance.scala |  2 +-
 .../scala/kafka/tools/SimpleConsumerShell.scala |  2 +-
 .../kafka/tools/VerifyConsumerRebalance.scala   |  8 ++---
 core/src/main/scala/kafka/utils/CoreUtils.scala |  4 +--
 .../src/main/scala/kafka/utils/Mx4jLoader.scala |  6 ++--
 core/src/main/scala/kafka/utils/Throttler.scala |  4 +--
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 10 +++---
 .../kafka/api/ConsumerBounceTest.scala          |  6 ++--
 .../kafka/api/IntegrationTestHarness.scala      |  6 ++--
 .../unit/kafka/admin/AddPartitionsTest.scala    |  4 +--
 .../test/scala/unit/kafka/admin/AdminTest.scala | 12 +++----
 .../unit/kafka/admin/DeleteTopicTest.scala      | 17 ++++------
 .../integration/KafkaServerTestHarness.scala    |  4 +--
 .../kafka/integration/RollingBounceTest.scala   |  4 +--
 .../integration/UncleanLeaderElectionTest.scala |  4 +--
 .../scala/unit/kafka/log/LogManagerTest.scala   |  2 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 16 ++++-----
 .../unit/kafka/server/LeaderElectionTest.scala  |  4 +--
 .../unit/kafka/server/ReplicaFetchTest.scala    |  5 +--
 .../unit/kafka/utils/IteratorTemplateTest.scala |  4 +--
 .../scala/unit/kafka/utils/MockScheduler.scala  |  8 ++---
 .../test/scala/unit/kafka/utils/TestUtils.scala | 10 +++---
 .../test/scala/unit/kafka/zk/ZKPathTest.scala   |  8 ++---
 .../java/kafka/examples/SimpleConsumerDemo.java |  4 +--
 70 files changed, 194 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 42c7219..bdff518 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -12,13 +12,6 @@
  */
 package org.apache.kafka.clients.consumer;
 
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceCallback;
 import org.apache.kafka.common.config.AbstractConfig;
@@ -27,6 +20,13 @@ import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.serialization.Deserializer;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
 /**
  * The consumer configuration keys
  */
@@ -304,7 +304,7 @@ public class ConsumerConfig extends AbstractConfig {
         return newProperties;
     }
 
-    ConsumerConfig(Map<? extends Object, ? extends Object> props) {
+    ConsumerConfig(Map<?, ?> props) {
         super(CONFIG, props);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 305ec8e..1ca75f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -12,15 +12,15 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.AbstractIterator;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.AbstractIterator;
-
 /**
  * A container that holds the list {@link ConsumerRecord} per partition for a
  * particular topic. There is one for every topic returned by a
@@ -55,7 +55,7 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
             throw new IllegalArgumentException("Topic must be non-null.");
         List<List<ConsumerRecord<K, V>>> recs = new ArrayList<List<ConsumerRecord<K, V>>>();
         for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
-            if (entry.getKey().equals(topic))
+            if (entry.getKey().topic().equals(topic))
                 recs.add(entry.getValue());
         }
         return new ConcatenatedIterable<K, V>(recs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 5a57555..187d000 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -242,7 +242,7 @@ public class ProducerConfig extends AbstractConfig {
         return newProperties;
     }
 
-    ProducerConfig(Map<? extends Object, ? extends Object> props) {
+    ProducerConfig(Map<?, ?> props) {
         super(CONFIG, props);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java
index cf91f5f..c571b4b 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Histogram.java
@@ -58,7 +58,7 @@ public class Histogram {
 
     @Override
     public String toString() {
-        StringBuilder b = new StringBuilder('{');
+        StringBuilder b = new StringBuilder("{");
         for (int i = 0; i < this.hist.length - 1; i++) {
             b.append(String.format("%.10f", binScheme.fromBin(i)));
             b.append(':');

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
index d3394ee..dab1a94 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
@@ -49,7 +49,7 @@ public enum SecurityProtocol {
     }
 
     public static String getName(int id) {
-        return CODE_TO_SECURITY_PROTOCOL.get(id).name;
+        return CODE_TO_SECURITY_PROTOCOL.get((short) id).name;
     }
 
     public static List<String> getNames() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 66442ed..db1b0ee 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -12,16 +12,16 @@
  */
 package org.apache.kafka.common.config;
 
-import static org.junit.Assert.fail;
-
-import java.util.Map;
-import java.util.Properties;
-
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.junit.Test;
 
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.fail;
+
 public class AbstractConfigTest {
 
     @Test
@@ -73,7 +73,7 @@ public class AbstractConfigTest {
                                             METRIC_REPORTER_CLASSES_DOC);
         }
 
-        public TestConfig(Map<? extends Object, ? extends Object> props) {
+        public TestConfig(Map<?, ?> props) {
             super(CONFIG, props);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index 8b92634..6c335a1 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -43,8 +43,8 @@ public class ProtocolSerializationTest {
                                  new Field("struct", new Schema(new Field("field", Type.INT32))));
         this.struct = new Struct(this.schema).set("int8", (byte) 1)
                                              .set("int16", (short) 1)
-                                             .set("int32", (int) 1)
-                                             .set("int64", (long) 1)
+                                             .set("int32", 1)
+                                             .set("int64", 1L)
                                              .set("string", "1")
                                              .set("bytes", "1".getBytes())
                                              .set("array", new Object[] {1});

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
----------------------------------------------------------------------
diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
index 1d0e0a9..c9b9018 100644
--- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
+++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
@@ -159,7 +159,7 @@ public class KafkaETLContext {
         _response = _consumer.fetch(fetchRequest);
         if(_response != null) {
             _respIterator = new ArrayList<ByteBufferMessageSet>(){{
-                add((ByteBufferMessageSet) _response.messageSet(_request.getTopic(), _request.getPartition()));
+                add(_response.messageSet(_request.getTopic(), _request.getPartition()));
             }}.iterator();
         }
         _requestTime += (System.currentTimeMillis() - tempTime);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index bbe3362..acaa611 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -127,7 +127,7 @@ object ReassignPartitionsCommand extends Logging {
     }
     val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned.toMap)
     // before starting assignment, output the current replica assignment to facilitate rollback
-    val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq)
+    val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic))
     println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback"
       .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
     // start the reassignment

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 60f0228..8e6f186 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -143,7 +143,7 @@ object TopicCommand {
     topics.foreach { topic =>
       try {
         if (Topic.InternalTopics.contains(topic)) {
-          throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic));
+          throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
         } else {
           ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
           println("Topic %s is marked for deletion.".format(topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
index 5be393a..fe81635 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala
@@ -37,9 +37,9 @@ object ControlledShutdownRequest extends Logging {
   }
 }
 
-case class ControlledShutdownRequest(val versionId: Short,
-                                     val correlationId: Int,
-                                     val brokerId: Int)
+case class ControlledShutdownRequest(versionId: Short,
+                                     correlationId: Int,
+                                     brokerId: Int)
   extends RequestOrResponse(Some(RequestKeys.ControlledShutdownKey)){
 
   def this(correlationId: Int, brokerId: Int) =
@@ -74,4 +74,4 @@ case class ControlledShutdownRequest(val versionId: Short,
     controlledShutdownRequest.append("; BrokerId: " + brokerId)
     controlledShutdownRequest.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index 5e0a1cf..9ecdee7 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -39,9 +39,9 @@ object ControlledShutdownResponse {
 }
 
 
-case class ControlledShutdownResponse(val correlationId: Int,
-                                      val errorCode: Short = ErrorMapping.NoError,
-                                      val partitionsRemaining: Set[TopicAndPartition])
+case class ControlledShutdownResponse(correlationId: Int,
+                                      errorCode: Short = ErrorMapping.NoError,
+                                      partitionsRemaining: Set[TopicAndPartition])
   extends RequestOrResponse() {
   def sizeInBytes(): Int ={
     var size =
@@ -68,4 +68,4 @@ case class ControlledShutdownResponse(val correlationId: Int,
 
   override def describe(details: Boolean):String = { toString }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
index f40e19f..b0c6d7a 100644
--- a/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
+++ b/core/src/main/scala/kafka/api/GenericRequestAndHeader.scala
@@ -36,7 +36,7 @@ private[kafka] abstract class GenericRequestAndHeader(val versionId: Short,
     2 /* version id */ +
     4 /* correlation id */ +
     (2 + clientId.length) /* client id */ +
-    body.sizeOf();
+    body.sizeOf()
   }
 
   override def toString(): String = {
@@ -52,4 +52,4 @@ private[kafka] abstract class GenericRequestAndHeader(val versionId: Short,
     strBuffer.append("; Body: " + body.toString)
     strBuffer.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
index a4879e2..748b5e9 100644
--- a/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
+++ b/core/src/main/scala/kafka/api/GenericResponseAndHeader.scala
@@ -29,7 +29,7 @@ private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int,
 
   def sizeInBytes(): Int = {
     4 /* correlation id */ +
-    body.sizeOf();
+    body.sizeOf()
   }
 
   override def toString(): String = {
@@ -43,4 +43,4 @@ private[kafka] abstract class GenericResponseAndHeader(val correlationId: Int,
     strBuffer.append("; Body: " + body.toString)
     strBuffer.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 2fad585..431190a 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -59,8 +59,8 @@ object PartitionStateInfo {
   }
 }
 
-case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
-                              val allReplicas: Set[Int]) {
+case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                              allReplicas: Set[Int]) {
   def replicationFactor = allReplicas.size
 
   def writeTo(buffer: ByteBuffer) {
@@ -200,4 +200,4 @@ case class LeaderAndIsrRequest (versionId: Short,
       leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
     leaderAndIsrRequest.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/api/StopReplicaResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
index 3431f3f..2fc3c95 100644
--- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
@@ -42,9 +42,9 @@ object StopReplicaResponse {
 }
 
 
-case class StopReplicaResponse(val correlationId: Int,
-                               val responseMap: Map[TopicAndPartition, Short],
-                               val errorCode: Short = ErrorMapping.NoError)
+case class StopReplicaResponse(correlationId: Int,
+                               responseMap: Map[TopicAndPartition, Short],
+                               errorCode: Short = ErrorMapping.NoError)
     extends RequestOrResponse() {
   def sizeInBytes(): Int ={
     var size =
@@ -72,4 +72,4 @@ case class StopReplicaResponse(val correlationId: Int,
   }
 
   override def describe(details: Boolean):String = { toString }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala
index 5e39f45..bd866bc 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -109,7 +109,7 @@ object PartitionMetadata {
 }
 
 case class PartitionMetadata(partitionId: Int, 
-                             val leader: Option[BrokerEndPoint],
+                             leader: Option[BrokerEndPoint],
                              replicas: Seq[BrokerEndPoint],
                              isr: Seq[BrokerEndPoint] = Seq.empty,
                              errorCode: Short = ErrorMapping.NoError) extends Logging {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 7dca09c..363bae0 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -46,10 +46,10 @@ object TopicMetadataRequest extends Logging {
   }
 }
 
-case class TopicMetadataRequest(val versionId: Short,
-                                val correlationId: Int,
-                                val clientId: String,
-                                val topics: Seq[String])
+case class TopicMetadataRequest(versionId: Short,
+                                correlationId: Int,
+                                clientId: String,
+                                topics: Seq[String])
  extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
 
   def this(topics: Seq[String], correlationId: Int) =
@@ -93,4 +93,4 @@ case class TopicMetadataRequest(val versionId: Short,
       topicMetadataRequest.append("; Topics: " + topics.mkString(","))
     topicMetadataRequest.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index b66424b..62394c0 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -74,7 +74,7 @@ object ClientUtils extends Logging{
     } else {
       debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
     }
-    return topicMetadataResponse
+    topicMetadataResponse
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/cluster/EndPoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala
index 3286f6d..e9008e6 100644
--- a/core/src/main/scala/kafka/cluster/EndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/EndPoint.scala
@@ -68,7 +68,7 @@ case class EndPoint(host: String, port: Int, protocolType: SecurityProtocol) {
   def writeTo(buffer: ByteBuffer): Unit = {
     buffer.putInt(port)
     writeShortString(buffer, host)
-    buffer.putShort(protocolType.id.toShort)
+    buffer.putShort(protocolType.id)
   }
 
   def sizeInBytes: Int =

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala b/core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala
deleted file mode 100644
index ae5018d..0000000
--- a/core/src/main/scala/kafka/common/ConsumerReblanceFailedException.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-/**
- * Thrown when a request is made for broker but no brokers with that topic
- * exist.
- */
-class ConsumerRebalanceFailedException(message: String) extends RuntimeException(message) {
-  def this() = this(null)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 97acdb2..1b22310 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -309,8 +309,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
     }
     updateMetadataRequestMap.clear()
     stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
-      val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet
-      val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet
+      val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
+      val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
       debug("The stop replica request (delete = true) sent to broker %d is %s"
         .format(broker, stopReplicaWithDelete.mkString(",")))
       debug("The stop replica request (delete = false) sent to broker %d is %s"

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 3a09377..a635116 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -647,7 +647,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
    */
   def startup() = {
     inLock(controllerContext.controllerLock) {
-      info("Controller starting up");
+      info("Controller starting up")
       registerSessionExpirationListener()
       isRunning = true
       controllerElector.startup
@@ -1326,7 +1326,7 @@ case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
   }
 }
 
-case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
+case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
   override def toString(): String = {
     val leaderAndIsrInfo = new StringBuilder
     leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index e5c56e0..3a44fdc 100755
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -282,7 +282,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
     val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap
     debug("Are all replicas for topic %s deleted %s".format(topic, replicaStatesForTopic))
-    replicaStatesForTopic.foldLeft(true)((deletionState, r) => deletionState && r._2 == ReplicaDeletionSuccessful)
+    replicaStatesForTopic.forall(_._2 == ReplicaDeletionSuccessful)
   }
 
   def isAtLeastOneReplicaInDeletionStartedState(topic: String): Boolean = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
index 8defa2e..60fbdae 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
@@ -41,8 +41,7 @@ class DelayedRebalance(sessionTimeout: Long,
 
   /* check if all known consumers have requested to re-join group */
   override def tryComplete(): Boolean = {
-    allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.foldLeft
-      (true) ((agg, cur) => agg && cur.joinGroupReceived.get()))
+    allConsumersJoinedGroup.set(groupRegistry.memberRegistries.values.forall(_.joinGroupReceived.get()))
 
     if (allConsumersJoinedGroup.get())
       forceComplete()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/javaapi/Implicits.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala
index 8baf4d4..c69b0a3 100644
--- a/core/src/main/scala/kafka/javaapi/Implicits.scala
+++ b/core/src/main/scala/kafka/javaapi/Implicits.scala
@@ -47,9 +47,4 @@ private[javaapi] object Implicits extends Logging {
     }
   }
 
-  // used explicitly by ByteBufferMessageSet constructor as due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors
-  implicit def javaListToScalaBuffer[A](l: java.util.List[A]) = {
-    import scala.collection.JavaConversions._
-    l: collection.mutable.Buffer[A]
-  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
index 0125565..df30279 100644
--- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
@@ -19,14 +19,14 @@ package kafka.javaapi.message
 import java.util.concurrent.atomic.AtomicLong
 import java.nio.ByteBuffer
 import kafka.message._
-import kafka.javaapi.Implicits.javaListToScalaBuffer
+
+import scala.collection.JavaConverters._
 
 class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet {
   private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer)
   
   def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
-    // due to SI-4141 which affects Scala 2.8.1, implicits are not visible in constructors and must be used explicitly
-    this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), javaListToScalaBuffer(messages).toSeq : _*).buffer)
+    this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), messages.asScala: _*).buffer)
   }
 
   def this(messages: java.util.List[Message]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/log/CleanerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/CleanerConfig.scala b/core/src/main/scala/kafka/log/CleanerConfig.scala
index ade8386..782bc9a 100644
--- a/core/src/main/scala/kafka/log/CleanerConfig.scala
+++ b/core/src/main/scala/kafka/log/CleanerConfig.scala
@@ -29,13 +29,13 @@ package kafka.log
  * @param enableCleaner Allows completely disabling the log cleaner
  * @param hashAlgorithm The hash algorithm to use in key comparison.
  */
-case class CleanerConfig(val numThreads: Int = 1, 
-                         val dedupeBufferSize: Long = 4*1024*1024L,
-                         val dedupeBufferLoadFactor: Double = 0.9d,
-                         val ioBufferSize: Int = 1024*1024,
-                         val maxMessageSize: Int = 32*1024*1024,
-                         val maxIoBytesPerSecond: Double = Double.MaxValue,
-                         val backOffMs: Long = 15 * 1000,
-                         val enableCleaner: Boolean = true,
-                         val hashAlgorithm: String = "MD5") {
-}
\ No newline at end of file
+case class CleanerConfig(numThreads: Int = 1,
+                         dedupeBufferSize: Long = 4*1024*1024L,
+                         dedupeBufferLoadFactor: Double = 0.9d,
+                         ioBufferSize: Int = 1024*1024,
+                         maxMessageSize: Int = 32*1024*1024,
+                         maxIoBytesPerSecond: Double = Double.MaxValue,
+                         backOffMs: Long = 15 * 1000,
+                         enableCleaner: Boolean = true,
+                         hashAlgorithm: String = "MD5") {
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 12eacdf..abea8b2 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -72,7 +72,7 @@ class LogCleaner(val config: CleanerConfig,
                  time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
   
   /* for managing the state of partitions being cleaned. package-private to allow access in tests */
-  private[log] val cleanerManager = new LogCleanerManager(logDirs, logs);
+  private[log] val cleanerManager = new LogCleanerManager(logDirs, logs)
 
   /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
   private val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, 
@@ -622,4 +622,4 @@ private case class LogToClean(topicPartition: TopicAndPartition, log: Log, first
   val cleanableRatio = dirtyBytes / totalBytes.toDouble
   def totalBytes = cleanBytes + dirtyBytes
   override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index da55a34..a907da0 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -63,23 +63,23 @@ object Defaults {
  * @param compressionType compressionType for a given topic
  *
  */
-case class LogConfig(val segmentSize: Int = Defaults.SegmentSize,
-                     val segmentMs: Long = Defaults.SegmentMs,
-                     val segmentJitterMs: Long = Defaults.SegmentJitterMs,
-                     val flushInterval: Long = Defaults.FlushInterval,
-                     val flushMs: Long = Defaults.FlushMs,
-                     val retentionSize: Long = Defaults.RetentionSize,
-                     val retentionMs: Long = Defaults.RetentionMs,
-                     val maxMessageSize: Int = Defaults.MaxMessageSize,
-                     val maxIndexSize: Int = Defaults.MaxIndexSize,
-                     val indexInterval: Int = Defaults.IndexInterval,
-                     val fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs,
-                     val deleteRetentionMs: Long = Defaults.DeleteRetentionMs,
-                     val minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio,
-                     val compact: Boolean = Defaults.Compact,
-                     val uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
-                     val minInSyncReplicas: Int = Defaults.MinInSyncReplicas,
-                     val compressionType: String = Defaults.CompressionType) {
+case class LogConfig(segmentSize: Int = Defaults.SegmentSize,
+                     segmentMs: Long = Defaults.SegmentMs,
+                     segmentJitterMs: Long = Defaults.SegmentJitterMs,
+                     flushInterval: Long = Defaults.FlushInterval,
+                     flushMs: Long = Defaults.FlushMs,
+                     retentionSize: Long = Defaults.RetentionSize,
+                     retentionMs: Long = Defaults.RetentionMs,
+                     maxMessageSize: Int = Defaults.MaxMessageSize,
+                     maxIndexSize: Int = Defaults.MaxIndexSize,
+                     indexInterval: Int = Defaults.IndexInterval,
+                     fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs,
+                     deleteRetentionMs: Long = Defaults.DeleteRetentionMs,
+                     minCleanableRatio: Double = Defaults.MinCleanableDirtyRatio,
+                     compact: Boolean = Defaults.Compact,
+                     uncleanLeaderElectionEnable: Boolean = Defaults.UncleanLeaderElectionEnable,
+                     minInSyncReplicas: Int = Defaults.MinInSyncReplicas,
+                     compressionType: String = Defaults.CompressionType) {
 
   def toProps: Properties = {
     val props = new Properties()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 4ab22de..a1082ae 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -375,10 +375,10 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
     if(Os.isWindows)
       lock.lock()
     try {
-      return fun
+      fun
     } finally {
       if(Os.isWindows)
         lock.unlock()
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/log/OffsetMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala
index 2940e47..303aad5 100755
--- a/core/src/main/scala/kafka/log/OffsetMap.scala
+++ b/core/src/main/scala/kafka/log/OffsetMap.scala
@@ -69,7 +69,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend
   /**
    * The maximum number of entries this map can contain
    */
-  val slots: Int = (memory / bytesPerEntry).toInt
+  val slots: Int = memory / bytesPerEntry
   
   /**
    * Associate this offset to the given key.
@@ -177,4 +177,4 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend
     digest.digest(buffer, 0, hashSize)
   }
   
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/log/OffsetPosition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetPosition.scala b/core/src/main/scala/kafka/log/OffsetPosition.scala
index 6cefde4..24b6dcf 100644
--- a/core/src/main/scala/kafka/log/OffsetPosition.scala
+++ b/core/src/main/scala/kafka/log/OffsetPosition.scala
@@ -22,4 +22,4 @@ package kafka.log
  * in some log file of the beginning of the message set entry with the
  * given offset.
  */
-case class OffsetPosition(val offset: Long, val position: Int)
\ No newline at end of file
+case class OffsetPosition(offset: Long, position: Int)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/message/MessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala
index f1b8432..28b56e6 100644
--- a/core/src/main/scala/kafka/message/MessageSet.scala
+++ b/core/src/main/scala/kafka/message/MessageSet.scala
@@ -43,7 +43,7 @@ object MessageSet {
     var size = 0
     val iter = messages.iterator
     while(iter.hasNext) {
-      val message = iter.next.asInstanceOf[Message]
+      val message = iter.next
       size += entrySize(message)
     }
     size

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
index 30fd0ea..0d6da34 100755
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
@@ -52,7 +52,7 @@ object KafkaMetricsReporter {
 
   def startReporters (verifiableProps: VerifiableProperties) {
     ReporterStarted synchronized {
-      if (ReporterStarted.get() == false) {
+      if (!ReporterStarted.get()) {
         val metricsConfig = new KafkaMetricsConfig(verifiableProps)
         if(metricsConfig.reporters.size > 0) {
           metricsConfig.reporters.foreach(reporterType => {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index c5fec00..b9bedde 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -87,7 +87,7 @@ class SocketServer(val brokerId: Int,
           quotas,
           connectionsMaxIdleMs,
           portToProtocol)
-        Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start();
+        Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start()
       }
     }
 
@@ -244,7 +244,7 @@ private[kafka] class Acceptor(val host: String,
    * Accept loop that checks for new connection attempts
    */
   def run() {
-    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+    serverChannel.register(selector, SelectionKey.OP_ACCEPT)
     startupComplete()
     var currentProcessor = 0
     while(isRunning) {
@@ -480,7 +480,7 @@ private[kafka] class Processor(val id: Int,
       key.attach(receive)
     }
     val read = receive.readFrom(socketChannel)
-    val address = socketChannel.socket.getRemoteSocketAddress();
+    val address = socketChannel.socket.getRemoteSocketAddress()
     trace(read + " bytes read from " + address)
     if(read < 0) {
       close(key)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/producer/KeyedMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/KeyedMessage.scala b/core/src/main/scala/kafka/producer/KeyedMessage.scala
index 388bc9b..dbcf295 100644
--- a/core/src/main/scala/kafka/producer/KeyedMessage.scala
+++ b/core/src/main/scala/kafka/producer/KeyedMessage.scala
@@ -21,7 +21,7 @@ package kafka.producer
  * A topic, key, and value.
  * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
  */
-case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
+case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) {
   if(topic == null)
     throw new IllegalArgumentException("Topic cannot be null.")
   
@@ -39,4 +39,4 @@ case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, v
   }
   
   def hasKey = key != null
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index 01e8f72..6e8d68d 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -72,7 +72,7 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging {
         }
       } catch {
         case e: FileNotFoundException =>
-          warn("No meta.properties file under dir %s".format(file.getAbsolutePath(), e.getMessage))
+          warn("No meta.properties file under dir %s".format(file.getAbsolutePath()))
           None
         case e1: Exception =>
           error("Failed to read meta.properties file under dir %s due to %s".format(file.getAbsolutePath(), e1.getMessage))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index c63f4ba..d401bac 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -458,7 +458,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
       checkpoint.write(new BrokerMetadata(brokerId))
     }
 
-    return brokerId
+    brokerId
   }
 
   private def generateBrokerId: Int = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
index a868334..00b60fe 100644
--- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
+++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
@@ -26,7 +26,7 @@ object LogOffsetMetadata {
 
   class OffsetOrdering extends Ordering[LogOffsetMetadata] {
     override def compare(x: LogOffsetMetadata , y: LogOffsetMetadata ): Int = {
-      return x.offsetDiff(y).toInt
+      x.offsetDiff(y).toInt
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 8ddd325..59c9bc3 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -242,7 +242,7 @@ class ReplicaManager(val config: KafkaConfig,
   def getReplicaOrException(topic: String, partition: Int): Replica = {
     val replicaOpt = getReplica(topic, partition)
     if(replicaOpt.isDefined)
-      return replicaOpt.get
+      replicaOpt.get
     else
       throw new ReplicaNotAvailableException("Replica %d is not available for partition [%s,%d]".format(config.brokerId, topic, partition))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 80b2674..bba3990 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -69,7 +69,7 @@ object ConsoleConsumer extends Logging {
             .withRequiredArg
             .describedAs("prop")
             .ofType(classOf[String])
-    val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up");
+    val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up")
     val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
             "start with the earliest message present in the log rather than the latest message.")
     val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
@@ -209,7 +209,7 @@ object ConsoleConsumer extends Logging {
 
   def checkZkPathExists(zkUrl: String, path: String): Boolean = {
     try {
-      val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer);
+      val zk = new ZkClient(zkUrl, 30*1000,30*1000, ZKStringSerializer)
       zk.exists(path)
     } catch {
       case _: Throwable => false

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 00265f9..6971e6e 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -74,7 +74,7 @@ object ConsoleProducer {
 
   def getOldProducerProps(config: ProducerConfig): Properties = {
 
-    val props = new Properties;
+    val props = new Properties
 
     props.putAll(config.extraProducerProps)
 
@@ -100,7 +100,7 @@ object ConsoleProducer {
 
   def getNewProducerProps(config: ProducerConfig): Properties = {
 
-    val props = new Properties;
+    val props = new Properties
 
     props.putAll(config.extraProducerProps)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index 4d051bc..ce14bbc 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -114,11 +114,10 @@ object ExportZkOffsets extends Logging {
     }
   }
 
-  private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] = {
-    return ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList
-  }
+  private def getBrokeridPartition(zkClient: ZkClient, consumerGroup: String, topic: String): List[String] =
+    ZkUtils.getChildrenParentMayNotExist(zkClient, "/consumers/%s/offsets/%s".format(consumerGroup, topic)).toList
   
-  private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] = {
-    return ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList
-  }
+  private def getTopicsList(zkClient: ZkClient, consumerGroup: String): List[String] =
+    ZkUtils.getChildren(zkClient, "/consumers/%s/offsets".format(consumerGroup)).toList
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index abe0972..598350d 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -89,7 +89,7 @@ object ImportZkOffsets extends Logging {
       s = br.readLine()
     }
     
-    return partOffsetsMap
+    partOffsetsMap
   }
   
   private def updateZkOffsets(zkClient: ZkClient, partitionOffsets: Map[String,String]): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala
index 1d1a120..c2b2030 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -103,7 +103,7 @@ object JmxTool extends Logging {
 
     // print csv header
     val keys = List("time") ++ queryAttributes(mbsc, names, attributesWhitelist).keys.toArray.sorted
-    if(keys.size == numExpectedAttributes.map(_._2).foldLeft(0)(_ + _) + 1)
+    if(keys.size == numExpectedAttributes.map(_._2).sum + 1)
       println(keys.map("\"" + _ + "\"").mkString(","))
 
     while(true) {
@@ -113,7 +113,7 @@ object JmxTool extends Logging {
         case Some(dFormat) => dFormat.format(new Date)
         case None => System.currentTimeMillis().toString
       }
-      if(attributes.keySet.size == numExpectedAttributes.map(_._2).foldLeft(0)(_ + _) + 1)
+      if(attributes.keySet.size == numExpectedAttributes.map(_._2).sum + 1)
         println(keys.map(attributes(_)).mkString(","))
       val sleep = max(0, interval - (System.currentTimeMillis - start))
       Thread.sleep(sleep)
@@ -137,4 +137,4 @@ object JmxTool extends Logging {
     attributes
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index bc25cd2..71b1bd5 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -235,7 +235,7 @@ object ProducerPerformance extends Logging {
 
       val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x')
       debug(seqMsgString)
-      return seqMsgString.getBytes()
+      seqMsgString.getBytes()
     }
 
     private def generateProducerData(topic: String, messageId: Long): Array[Byte] = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index dec9516..1c2023c 100755
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -148,7 +148,7 @@ object SimpleConsumerShell extends Logging {
     if(replicaId == UseLeaderReplica) {
       replicaOpt = partitionMetadataOpt.get.leader
       if(!replicaOpt.isDefined) {
-        System.err.println("Error: user speicifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(replicaId, topic, partitionId))
+        System.err.println("Error: user specifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(topic, partitionId))
         System.exit(1)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index aef8361..4fb519b 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -79,9 +79,7 @@ object VerifyConsumerRebalance extends Logging {
     val consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group, excludeInternalTopics = false)
     val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, consumersPerTopicMap.keySet.toSeq)
 
-    partitionsPerTopicMap.foreach { partitionsForTopic =>
-      val topic = partitionsForTopic._1
-      val partitions = partitionsForTopic._2
+    partitionsPerTopicMap.foreach { case (topic, partitions) =>
       val topicDirs = new ZKGroupTopicDirs(group, topic)
       info("Alive partitions for topic %s are %s ".format(topic, partitions.toString))
       info("Alive consumers for topic %s => %s ".format(topic, consumersPerTopicMap.get(topic)))
@@ -95,8 +93,8 @@ object VerifyConsumerRebalance extends Logging {
 
       // for each available partition for topic, check if an owner exists
       partitions.foreach { partition =>
-      // check if there is a node for [partition]
-        if(!partitionsWithOwners.exists(p => p.equals(partition))) {
+        // check if there is a node for [partition]
+        if(!partitionsWithOwners.contains(partition.toString)) {
           error("No owner for partition [%s,%d]".format(topic, partition))
           rebalanceSucceeded = false
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index c473a03..98abc45 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -102,7 +102,7 @@ object CoreUtils extends Logging {
    * Recursively delete the list of files/directories and any subfiles (if any exist)
    * @param files sequence of files to be deleted
    */
-  def rm(files: Seq[String]): Unit = files.map(f => rm(new File(f)))
+  def rm(files: Seq[String]): Unit = files.foreach(f => rm(new File(f)))
 
   /**
    * Recursively delete the given file/directory and any subfiles (if any exist)
@@ -230,7 +230,7 @@ object CoreUtils extends Logging {
   def createObject[T<:AnyRef](className: String, args: AnyRef*): T = {
     val klass = Class.forName(className).asInstanceOf[Class[T]]
     val constructor = klass.getConstructor(args.map(_.getClass): _*)
-    constructor.newInstance(args: _*).asInstanceOf[T]
+    constructor.newInstance(args: _*)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/utils/Mx4jLoader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Mx4jLoader.scala b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
index 7417897..aa120ab 100644
--- a/core/src/main/scala/kafka/utils/Mx4jLoader.scala
+++ b/core/src/main/scala/kafka/utils/Mx4jLoader.scala
@@ -39,7 +39,7 @@ object Mx4jLoader extends Logging {
     val address = props.getString("mx4jaddress", "0.0.0.0")
     val port = props.getInt("mx4jport", 8082)
     try {
-      debug("Will try to load MX4j now, if it's in the classpath");
+      debug("Will try to load MX4j now, if it's in the classpath")
 
       val mbs = ManagementFactory.getPlatformMBeanServer()
       val processorName = new ObjectName("Server:name=XSLTProcessor")
@@ -62,10 +62,10 @@ object Mx4jLoader extends Logging {
     }
     catch {
 	  case e: ClassNotFoundException => {
-        info("Will not load MX4J, mx4j-tools.jar is not in the classpath");
+        info("Will not load MX4J, mx4j-tools.jar is not in the classpath")
       }
       case e: Throwable => {
-        warn("Could not start register mbean in JMX", e);
+        warn("Could not start register mbean in JMX", e)
       }
     }
     false

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/utils/Throttler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala
index d1a144d..998ade1 100644
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ b/core/src/main/scala/kafka/utils/Throttler.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.utils;
+package kafka.utils
 
 import kafka.metrics.KafkaMetricsGroup
 import java.util.concurrent.TimeUnit
@@ -95,4 +95,4 @@ object Throttler {
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 5685a1e..1da8f90 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -498,7 +498,7 @@ object ZkUtils extends Logging {
     try {
       client.getChildren(path)
     } catch {
-      case e: ZkNoNodeException => return Nil
+      case e: ZkNoNodeException => Nil
       case e2: Throwable => throw e2
     }
   }
@@ -728,21 +728,19 @@ object ZkUtils extends Logging {
   def getSequenceId(client: ZkClient, path: String): Int = {
     try {
       val stat = client.writeDataReturnStat(path, "", -1)
-      return stat.getVersion
+      stat.getVersion
     } catch {
       case e: ZkNoNodeException => {
         createParentPath(client, BrokerSequenceIdPath)
         try {
           client.createPersistent(BrokerSequenceIdPath, "")
-          return 0
+          0
         } catch {
           case e: ZkNodeExistsException =>
             val stat = client.writeDataReturnStat(BrokerSequenceIdPath, "", -1)
-            return stat.getVersion
-          case e2: Throwable => throw e2
+            stat.getVersion
         }
       }
-      case e2: Throwable => throw e2
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index 35f4f46..5c4cca6 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -71,7 +71,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   def consumeWithBrokerFailures(numIters: Int) {
     val numRecords = 1000
     sendRecords(numRecords)
-    this.producers.map(_.close)
+    this.producers.foreach(_.close)
 
     var consumed = 0
     val consumer = this.consumers(0)
@@ -100,7 +100,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
   def seekAndCommitWithBrokerFailures(numIters: Int) {
     val numRecords = 1000
     sendRecords(numRecords)
-    this.producers.map(_.close)
+    this.producers.foreach(_.close)
 
     val consumer = this.consumers(0)
     consumer.subscribe(tp)
@@ -151,4 +151,4 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     }
     futures.map(_.get)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 02d2627..2bbd4c9 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -44,7 +44,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
 
   override def generateConfigs() = {
     val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect)
-    cfgs.map(_.putAll(serverConfig))
+    cfgs.foreach(_.putAll(serverConfig))
     cfgs.map(KafkaConfig.fromProps)
   }
 
@@ -70,8 +70,8 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
   }
   
   override def tearDown() {
-    producers.map(_.close())
-    consumers.map(_.close())
+    producers.foreach(_.close())
+    consumers.foreach(_.close())
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index ab5d16c..df5c6ba 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -55,8 +55,8 @@ class AddPartitionsTest extends JUnit3Suite with ZooKeeperTestHarness {
   }
 
   override def tearDown() {
-    servers.map(server => server.shutdown())
-    servers.map(server => CoreUtils.rm(server.config.logDirs))
+    servers.foreach(_.shutdown())
+    servers.foreach(server => CoreUtils.rm(server.config.logDirs))
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 4b728a1..efb2f8e 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -301,7 +301,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
-    val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
+    val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
     // broker 2 should be the leader since it was started first
     val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition, oldLeaderOpt = None).get
     // trigger preferred replica election
@@ -319,7 +319,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     val partition = 1
     // create brokers
     val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
-    val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
+    val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
     // create the topic
     TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment = expectedReplicaAssignment, servers = servers)
 
@@ -330,7 +330,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     try {
       // wait for the update metadata request to trickle to the brokers
       TestUtils.waitUntilTrue(() =>
-        activeServers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3),
+        activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3),
         "Topic test not created after timeout")
       assertEquals(0, partitionsRemaining.size)
       var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
@@ -346,11 +346,11 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
       assertEquals(0, leaderAfterShutdown)
 
-      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+      assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
       partitionsRemaining = controller.shutdownBroker(0)
       assertEquals(1, partitionsRemaining.size)
       // leader doesn't change since all the replicas are shut down
-      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+      assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
     }
     finally {
       servers.foreach(_.shutdown())
@@ -397,7 +397,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       checkConfig(2*maxMessageSize, 2 * retentionMs)
     } finally {
       server.shutdown()
-      server.config.logDirs.map(CoreUtils.rm(_))
+      server.config.logDirs.foreach(CoreUtils.rm(_))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 61cc602..fa8ce25 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -54,7 +54,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     // check if all replicas but the one that is shut down has deleted the log
     TestUtils.waitUntilTrue(() =>
       servers.filter(s => s.config.brokerId != follower.config.brokerId)
-        .foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.")
+        .forall(_.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.")
     // ensure topic deletion is halted
     TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
       "Admin path /admin/delete_topic/test path deleted even when a follower replica is down")
@@ -104,8 +104,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // wait until replica log is created on every broker
-    TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
-      res && server.getLogManager().getLog(topicAndPartition).isDefined),
+    TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
       "Replicas for topic test not created.")
     val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
     assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
@@ -155,7 +154,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
     // verify that new partition doesn't exist on any broker either
     TestUtils.waitUntilTrue(() =>
-      servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty),
+      servers.forall(_.getLogManager().getLog(newPartition).isEmpty),
       "Replica logs not for new partition [test,1] not deleted after delete topic is complete.")
     servers.foreach(_.shutdown())
   }
@@ -173,7 +172,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
     // verify that new partition doesn't exist on any broker either
     assertTrue("Replica logs not deleted after delete topic is complete",
-      servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty))
+      servers.forall(_.getLogManager().getLog(newPartition).isEmpty))
     servers.foreach(_.shutdown())
   }
 
@@ -192,7 +191,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
     assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined)
     // check if all replica logs are created
-    TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined),
+    TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
       "Replicas for topic test not created.")
     servers.foreach(_.shutdown())
   }
@@ -207,8 +206,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     // verify delete topic path for test2 is removed from zookeeper
     TestUtils.verifyTopicDeletion(zkClient, "test2", 1, servers)
     // verify that topic test is untouched
-    TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
-      res && server.getLogManager().getLog(topicAndPartition).isDefined),
+    TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
       "Replicas for topic test not created")
     // test the topic path exists
     assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
@@ -267,8 +265,7 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // wait until replica log is created on every broker
-    TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
-      res && server.getLogManager().getLog(topicAndPartition).isDefined),
+    TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicAndPartition).isDefined),
       "Replicas for topic test not created")
     servers
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 447e421..87c6315 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -63,8 +63,8 @@ trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
   }
 
   override def tearDown() {
-    servers.map(server => server.shutdown())
-    servers.map(server => server.config.logDirs.map(CoreUtils.rm(_)))
+    servers.foreach(_.shutdown())
+    servers.foreach(_.config.logDirs.foreach(CoreUtils.rm(_)))
     super.tearDown
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
index 1113619..12d0733 100755
--- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -40,8 +40,8 @@ class RollingBounceTest extends JUnit3Suite with ZooKeeperTestHarness {
   }
 
   override def tearDown() {
-    servers.map(server => server.shutdown())
-    servers.map(server => CoreUtils.rm(server.config.logDirs))
+    servers.foreach(_.shutdown())
+    servers.foreach(server => CoreUtils.rm(server.config.logDirs))
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 5b7b529..e4bf2df 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -78,8 +78,8 @@ class UncleanLeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   }
 
   override def tearDown() {
-    servers.map(server => shutdownServer(server))
-    servers.map(server => CoreUtils.rm(server.config.logDirs))
+    servers.foreach(server => shutdownServer(server))
+    servers.foreach(server => CoreUtils.rm(server.config.logDirs))
 
     // restore log levels
     kafkaApisLogger.setLevel(Level.ERROR)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 0a26f5f..01dfbc4 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -48,7 +48,7 @@ class LogManagerTest extends JUnit3Suite {
     if(logManager != null)
       logManager.shutdown()
     CoreUtils.rm(logDir)
-    logManager.logDirs.map(CoreUtils.rm(_))
+    logManager.logDirs.foreach(CoreUtils.rm(_))
     super.tearDown()
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 069aa02..76d3bfd 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -107,7 +107,7 @@ class LogTest extends JUnitSuite {
 
     time.sleep(log.config.segmentMs - maxJitter)
     log.append(set)
-    assertEquals("Log does not roll on this append because it occurs earlier than max jitter", 1, log.numberOfSegments);
+    assertEquals("Log does not roll on this append because it occurs earlier than max jitter", 1, log.numberOfSegments)
     time.sleep(maxJitter - log.activeSegment.rollJitterMs + 1)
     log.append(set)
     assertEquals("Log should roll after segmentMs adjusted by random jitter", 2, log.numberOfSegments)
@@ -302,7 +302,7 @@ class LogTest extends JUnitSuite {
       assertEquals("Still no change in the logEndOffset", currOffset, log.logEndOffset)
       assertEquals("Should still be able to append and should get the logEndOffset assigned to the new append",
                    currOffset,
-                   log.append(TestUtils.singleMessageSet("hello".toString.getBytes)).firstOffset)
+                   log.append(TestUtils.singleMessageSet("hello".getBytes)).firstOffset)
 
       // cleanup the log
       log.delete()
@@ -752,7 +752,7 @@ class LogTest extends JUnitSuite {
     val topic: String = "test_topic"
     val partition:String = "143"
     val dir: File = new File(logDir + topicPartitionName(topic, partition))
-    val topicAndPartition = Log.parseTopicPartitionName(dir);
+    val topicAndPartition = Log.parseTopicPartitionName(dir)
     assertEquals(topic, topicAndPartition.asTuple._1)
     assertEquals(partition.toInt, topicAndPartition.asTuple._2)
   }
@@ -761,7 +761,7 @@ class LogTest extends JUnitSuite {
   def testParseTopicPartitionNameForEmptyName() {
     try {
       val dir: File = new File("")
-      val topicAndPartition = Log.parseTopicPartitionName(dir);
+      val topicAndPartition = Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
       case e: Exception => // its GOOD!
@@ -772,7 +772,7 @@ class LogTest extends JUnitSuite {
   def testParseTopicPartitionNameForNull() {
     try {
       val dir: File = null
-      val topicAndPartition = Log.parseTopicPartitionName(dir);
+      val topicAndPartition = Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir)
     } catch {
       case e: Exception => // its GOOD!
@@ -785,7 +785,7 @@ class LogTest extends JUnitSuite {
     val partition:String = "1999"
     val dir: File = new File(logDir + File.separator + topic + partition)
     try {
-      val topicAndPartition = Log.parseTopicPartitionName(dir);
+      val topicAndPartition = Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
       case e: Exception => // its GOOD!
@@ -798,7 +798,7 @@ class LogTest extends JUnitSuite {
     val partition:String = "1999"
     val dir: File = new File(logDir + topicPartitionName(topic, partition))
     try {
-      val topicAndPartition = Log.parseTopicPartitionName(dir);
+      val topicAndPartition = Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
       case e: Exception => // its GOOD!
@@ -811,7 +811,7 @@ class LogTest extends JUnitSuite {
     val partition:String = ""
     val dir: File = new File(logDir + topicPartitionName(topic, partition))
     try {
-      val topicAndPartition = Log.parseTopicPartitionName(dir);
+      val topicAndPartition = Log.parseTopicPartitionName(dir)
       fail("KafkaException should have been thrown for dir: " + dir.getCanonicalPath)
     } catch {
       case e: Exception => // its GOOD!

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 26572f7..f1977d8 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -49,8 +49,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
   }
 
   override def tearDown() {
-    servers.map(server => server.shutdown())
-    servers.map(server => CoreUtils.rm(server.config.logDirs))
+    servers.foreach(_.shutdown())
+    servers.foreach(server => CoreUtils.rm(server.config.logDirs))
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
index a67cc37..a3a03db 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
@@ -65,8 +65,9 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
       for (topic <- List(topic1, topic2)) {
         val topicAndPart = TopicAndPartition(topic, partition)
         val expectedOffset = brokers.head.getLogManager().getLog(topicAndPart).get.logEndOffset
-        result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total &&
-          (expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset) }
+        result = result && expectedOffset > 0 && brokers.forall { item =>
+          (expectedOffset == item.getLogManager().getLog(topicAndPart).get.logEndOffset)
+        }
       }
       result
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed1a548c/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
index 46a4e89..fbd245c 100644
--- a/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala
@@ -22,7 +22,7 @@ import org.junit.{Test, After, Before}
 
 class IteratorTemplateTest extends Assertions {
   
-  val lst = (0 until 10).toSeq
+  val lst = (0 until 10)
   val iterator = new IteratorTemplate[Int]() {
     var i = 0
     override def makeNext() = {
@@ -54,4 +54,4 @@ class IteratorTemplateTest extends Assertions {
     }
   }
   
-}
\ No newline at end of file
+}


Mime
View raw message