kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Remove a couple of redundant `CoreUtils.rm` methods
Date Mon, 28 Mar 2016 21:35:36 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4c0660bf3 -> 43d5078e9


MINOR: Remove a couple of redundant `CoreUtils.rm` methods

Also:
* Rename remaining `CoreUtils.rm` to `delete` for consistency
* Use `try with resources` in `Utils` to simplify code
* Silence compiler warning due to exception catch clause in `TestUtils`

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1153 from ijuma/remove-redundant-core-utils-rm


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/43d5078e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/43d5078e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/43d5078e

Branch: refs/heads/trunk
Commit: 43d5078e981bbb25fd81cdc8ba4c339cd2d3f3d2
Parents: 4c0660b
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Mon Mar 28 14:35:31 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Mar 28 14:35:31 2016 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/utils/Utils.java    | 18 ++++---------
 core/src/main/scala/kafka/log/Log.scala         |  3 ++-
 .../kafka/metrics/KafkaCSVMetricsReporter.scala |  7 ++---
 core/src/main/scala/kafka/utils/CoreUtils.scala | 27 +-------------------
 .../kafka/api/ProducerCompressionTest.scala     |  2 +-
 .../test/scala/other/kafka/StressTestLog.scala  |  5 ++--
 .../other/kafka/TestLinearWriteSpeed.scala      |  7 +++--
 .../unit/kafka/admin/AddPartitionsTest.scala    |  2 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala | 12 +++++----
 .../integration/KafkaServerTestHarness.scala    |  2 +-
 .../kafka/integration/RollingBounceTest.scala   |  2 +-
 .../integration/UncleanLeaderElectionTest.scala |  2 +-
 .../unit/kafka/log/BrokerCompressionTest.scala  |  9 +++----
 .../test/scala/unit/kafka/log/CleanerTest.scala |  2 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |  5 ++--
 .../scala/unit/kafka/log/LogManagerTest.scala   |  5 ++--
 .../src/test/scala/unit/kafka/log/LogTest.scala |  6 ++---
 .../unit/kafka/producer/ProducerTest.scala      |  4 +--
 .../unit/kafka/server/AdvertiseBrokerTest.scala |  4 +--
 .../server/HighwatermarkPersistenceTest.scala   |  7 +++--
 .../unit/kafka/server/LeaderElectionTest.scala  |  2 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |  3 ++-
 .../unit/kafka/server/LogRecoveryTest.scala     |  3 ++-
 .../unit/kafka/server/OffsetCommitTest.scala    |  9 +++----
 .../server/ServerGenerateBrokerIdTest.scala     | 18 ++++++-------
 .../unit/kafka/server/ServerShutdownTest.scala  |  6 ++---
 .../unit/kafka/server/ServerStartupTest.scala   |  6 ++---
 .../test/scala/unit/kafka/utils/TestUtils.scala | 10 ++++----
 .../scala/unit/kafka/zk/EmbeddedZookeeper.scala |  9 +++----
 29 files changed, 86 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 4c4225b..0167548 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -442,13 +442,8 @@ public class Utils {
      */
     public static Properties loadProps(String filename) throws IOException, FileNotFoundException
{
         Properties props = new Properties();
-        InputStream propStream = null;
-        try {
-            propStream = new FileInputStream(filename);
+        try (InputStream propStream = new FileInputStream(filename)) {
             props.load(propStream);
-        } finally {
-            if (propStream != null)
-                propStream.close();
         }
         return props;
     }
@@ -540,16 +535,13 @@ public class Utils {
      */
     public static String readFileAsString(String path, Charset charset) throws IOException
{
         if (charset == null) charset = Charset.defaultCharset();
-        FileInputStream stream = new FileInputStream(new File(path));
-        String result = new String();
-        try {
+
+        try (FileInputStream stream = new FileInputStream(new File(path))) {
             FileChannel fc = stream.getChannel();
             MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, fc.size());
-            result = charset.decode(bb).toString();
-        } finally {
-            stream.close();
+            return charset.decode(bb).toString();
         }
-        return result;
+
     }
 
     public static String readFileAsString(String path) throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 8c956f7..81c19fa 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.record.TimestampType
 
 import scala.collection.JavaConversions
 import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.utils.Utils
 
 object LogAppendInfo {
   val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec,
NoCompressionCodec, -1, -1, false)
@@ -714,7 +715,7 @@ class Log(val dir: File,
       removeLogMetrics()
       logSegments.foreach(_.delete())
       segments.clear()
-      CoreUtils.rm(dir)
+      Utils.delete(dir)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
index cc0da9f..686f692 100755
--- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala
@@ -22,14 +22,15 @@ package kafka.metrics
 
 import com.yammer.metrics.Metrics
 import java.io.File
+
 import com.yammer.metrics.reporting.CsvReporter
 import java.util.concurrent.TimeUnit
-import kafka.utils.{CoreUtils, VerifiableProperties, Logging}
 
+import kafka.utils.{Logging, VerifiableProperties}
+import org.apache.kafka.common.utils.Utils
 
 private trait KafkaCSVMetricsReporterMBean extends KafkaMetricsReporterMBean
 
-
 private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
                               with KafkaCSVMetricsReporterMBean
                               with Logging {
@@ -48,7 +49,7 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter
       if (!initialized) {
         val metricsConfig = new KafkaMetricsConfig(props)
         csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics"))
-        CoreUtils.rm(csvDir)
+        Utils.delete(csvDir)
         csvDir.mkdirs()
         underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir)
         if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index b01f5cc..fe2bebf 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -79,35 +79,10 @@ object CoreUtils extends Logging {
   }
 
   /**
-   * Recursively delete the given file/directory and any subfiles (if any exist)
-   * @param file The root file at which to begin deleting
-   */
-  def rm(file: String): Unit = rm(new File(file))
-
-  /**
    * Recursively delete the list of files/directories and any subfiles (if any exist)
    * @param files sequence of files to be deleted
    */
-  def rm(files: Seq[String]): Unit = files.foreach(f => rm(new File(f)))
-
-  /**
-   * Recursively delete the given file/directory and any subfiles (if any exist)
-   * @param file The root file at which to begin deleting
-   */
-  def rm(file: File) {
-	  if(file == null) {
-	    return
-	  } else if(file.isDirectory) {
-	    val files = file.listFiles()
-	    if(files != null) {
-	      for(f <- files)
-	        rm(f)
-	    }
-	    file.delete()
-	  } else {
-	    file.delete()
-	  }
-  }
+  def delete(files: Seq[String]): Unit = files.foreach(f => Utils.delete(new File(f)))
 
   /**
    * Register the given mbean with the platform mbean server,

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index c4a2bd7..fc1ceec 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -55,7 +55,7 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness
   @After
   override def tearDown() {
     server.shutdown
-    CoreUtils.rm(server.config.logDirs)
+    CoreUtils.delete(server.config.logDirs)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index dead0eb..8adc7e2 100755
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -19,11 +19,12 @@ package kafka
 
 import java.util.Properties
 import java.util.concurrent.atomic._
-import kafka.common._
+
 import kafka.message._
 import kafka.log._
 import kafka.utils._
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException
+import org.apache.kafka.common.utils.Utils
 
 /**
  * A stress test that instantiates a log and then runs continual appends against it from
one thread and continual reads against it
@@ -55,7 +56,7 @@ object StressTestLog {
         running.set(false)
         writer.join()
         reader.join()
-        CoreUtils.rm(dir)
+        Utils.delete(dir)
       }
     })
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
index 236d857..db281bf 100755
--- a/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
+++ b/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
@@ -21,11 +21,14 @@ import java.io._
 import java.nio._
 import java.nio.channels._
 import java.util.{Properties, Random}
+
 import kafka.log._
 import kafka.utils._
 import kafka.message._
+
 import scala.math._
 import joptsimple._
+import org.apache.kafka.common.utils.Utils
 
 /**
  * This test does linear writes using either a kafka log or a file and measures throughput
and latency.
@@ -196,7 +199,7 @@ object TestLinearWriteSpeed {
   }
   
   class LogWritable(val dir: File, config: LogConfig, scheduler: Scheduler, val messages:
ByteBufferMessageSet) extends Writable {
-    CoreUtils.rm(dir)
+    Utils.delete(dir)
     val log = new Log(dir, config, 0L, scheduler, SystemTime)
     def write(): Int = {
       log.append(messages, true)
@@ -204,7 +207,7 @@ object TestLinearWriteSpeed {
     }
     def close() {
       log.close()
-      CoreUtils.rm(log.dir)
+      Utils.delete(log.dir)
     }
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index b9bbace..ab8d363 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -59,7 +59,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   @After
   override def tearDown() {
     servers.foreach(_.shutdown())
-    servers.foreach(server => CoreUtils.rm(server.config.logDirs))
+    servers.foreach(server => CoreUtils.delete(server.config.logDirs))
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 8910e09..21bb6ab 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -22,13 +22,15 @@ import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.Assert._
 import org.junit.Test
 import java.util.Properties
+
 import kafka.utils._
 import kafka.log._
 import kafka.zk.ZooKeeperTestHarness
-import kafka.utils.{Logging, ZkUtils, TestUtils}
-import kafka.common.{TopicExistsException, TopicAndPartition}
-import kafka.server.{ConfigType, KafkaServer, KafkaConfig}
+import kafka.utils.{Logging, TestUtils, ZkUtils}
+import kafka.common.{TopicAndPartition, TopicExistsException}
+import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
 import java.io.File
+
 import TestUtils._
 
 import scala.collection.{Map, immutable}
@@ -418,7 +420,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
       assertEquals(newConfig, configInZk)
     } finally {
       server.shutdown()
-      server.config.logDirs.foreach(CoreUtils.rm(_))
+      CoreUtils.delete(server.config.logDirs)
     }
   }
 
@@ -449,7 +451,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest
{
       assertEquals(new Quota(2000, true), server.apis.quotaManagers(ApiKeys.FETCH.id).quota(clientId))
     } finally {
       server.shutdown()
-      server.config.logDirs.foreach(CoreUtils.rm(_))
+      CoreUtils.delete(server.config.logDirs)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 870b9ad..676772f 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -87,7 +87,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
   @After
   override def tearDown() {
     servers.foreach(_.shutdown())
-    servers.foreach(_.config.logDirs.foreach(CoreUtils.rm(_)))
+    servers.foreach(server => CoreUtils.delete(server.config.logDirs))
     super.tearDown
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
index b931568..5221855 100755
--- a/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala
@@ -43,7 +43,7 @@ class RollingBounceTest extends ZooKeeperTestHarness {
   @After
   override def tearDown() {
     servers.foreach(_.shutdown())
-    servers.foreach(server => CoreUtils.rm(server.config.logDirs))
+    servers.foreach(server => CoreUtils.delete(server.config.logDirs))
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index b725d8b..a8ba283 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -82,7 +82,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
   @After
   override def tearDown() {
     servers.foreach(server => shutdownServer(server))
-    servers.foreach(server => CoreUtils.rm(server.config.logDirs))
+    servers.foreach(server => CoreUtils.delete(server.config.logDirs))
 
     // restore log levels
     kafkaApisLogger.setLevel(Level.ERROR)

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
index d0cb4a1..7487bc5 100755
--- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.log
 
-import java.io.File
 import kafka.utils._
 import kafka.message._
 import org.scalatest.junit.JUnitSuite
@@ -26,9 +25,9 @@ import org.junit.Assert._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
-import java.util.{Properties, Collection, ArrayList}
-import kafka.server.KafkaConfig
 import org.apache.kafka.common.record.CompressionType
+import org.apache.kafka.common.utils.Utils
+import java.util.{Collection, Properties}
 import scala.collection.JavaConversions._
 
 @RunWith(value = classOf[Parameterized])
@@ -41,7 +40,7 @@ class BrokerCompressionTest(messageCompression: String, brokerCompression:
Strin
 
   @After
   def tearDown() {
-    CoreUtils.rm(tmpDir)
+    Utils.delete(tmpDir)
   }
 
   /**
@@ -78,4 +77,4 @@ object BrokerCompressionTest {
          messageCompression <- CompressionType.values
     ) yield Array(messageCompression.name, brokerCompression)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 3773233..b6849f0 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -49,7 +49,7 @@ class CleanerTest extends JUnitSuite {
   
   @After
   def teardown() {
-    CoreUtils.rm(tmpdir)
+    Utils.delete(tmpdir)
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 6b91611..cc9873c 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -25,6 +25,7 @@ import kafka.message._
 import kafka.server.OffsetCheckpoint
 import kafka.utils._
 import org.apache.kafka.common.record.CompressionType
+import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit._
 import org.junit.runner.RunWith
@@ -119,7 +120,7 @@ class LogCleanerIntegrationTest(compressionCodec: String) {
   @After
   def teardown() {
     time.scheduler.shutdown()
-    CoreUtils.rm(logDir)
+    Utils.delete(logDir)
   }
   
   /* create a cleaner instance and logs with the given parameters */
@@ -165,4 +166,4 @@ object LogCleanerIntegrationTest {
       list.add(Array(codec.name))
     list
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 46bfbed..f290d54 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -24,6 +24,7 @@ import kafka.common._
 import kafka.server.OffsetCheckpoint
 import kafka.utils._
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
+import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -54,8 +55,8 @@ class LogManagerTest {
   def tearDown() {
     if(logManager != null)
       logManager.shutdown()
-    CoreUtils.rm(logDir)
-    logManager.logDirs.foreach(CoreUtils.rm(_))
+    Utils.delete(logDir)
+    logManager.logDirs.foreach(Utils.delete)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index c2eb817..4d75d53 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -19,7 +19,6 @@ package kafka.log
 
 import java.io._
 import java.util.Properties
-import java.util.concurrent.atomic._
 
 import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException,
RecordBatchTooLargeException, RecordTooLargeException}
 import kafka.api.ApiVersion
@@ -30,6 +29,7 @@ import org.junit.{After, Before, Test}
 import kafka.message._
 import kafka.utils._
 import kafka.server.KafkaConfig
+import org.apache.kafka.common.utils.Utils
 
 class LogTest extends JUnitSuite {
 
@@ -47,7 +47,7 @@ class LogTest extends JUnitSuite {
 
   @After
   def tearDown() {
-    CoreUtils.rm(tmpDir)
+    Utils.delete(tmpDir)
   }
 
   def createEmptyLogs(dir: File, offsets: Int*) {
@@ -810,7 +810,7 @@ class LogTest extends JUnitSuite {
       log = new Log(logDir, config, recoveryPoint, time.scheduler, time)
       assertEquals(numMessages, log.logEndOffset)
       assertEquals("Messages in the log after recovery should be the same.", messages, log.logSegments.flatMap(_.log.iterator.toList))
-      CoreUtils.rm(logDir)
+      Utils.delete(logDir)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 4a1ad5a..cf25cdb 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -96,8 +96,8 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{
 
     server1.shutdown
     server2.shutdown
-    CoreUtils.rm(server1.config.logDirs)
-    CoreUtils.rm(server2.config.logDirs)
+    CoreUtils.delete(server1.config.logDirs)
+    CoreUtils.delete(server2.config.logDirs)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index 75fa664..dc17aa4 100755
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -43,7 +43,7 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
   @After
   override def tearDown() {
     server.shutdown()
-    CoreUtils.rm(server.config.logDirs)
+    CoreUtils.delete(server.config.logDirs)
     super.tearDown()
   }
 
@@ -55,4 +55,4 @@ class AdvertiseBrokerTest extends ZooKeeperTestHarness {
     assertEquals(advertisedPort, endpoint.port)
   }
   
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 2e66601..26e2817 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -18,16 +18,15 @@ package kafka.server
 
 import kafka.log._
 import java.io.File
-import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.{Utils, MockTime => JMockTime}
 import org.easymock.EasyMock
 import org.junit._
 import org.junit.Assert._
 import kafka.common._
 import kafka.cluster.Replica
-import kafka.utils.{ZkUtils, SystemTime, KafkaScheduler, TestUtils, MockTime, CoreUtils}
+import kafka.utils.{KafkaScheduler, MockTime, SystemTime, TestUtils, ZkUtils}
 import java.util.concurrent.atomic.AtomicBoolean
-import org.apache.kafka.common.utils.{MockTime => JMockTime}
 
 class HighwatermarkPersistenceTest {
 
@@ -42,7 +41,7 @@ class HighwatermarkPersistenceTest {
   @After
   def teardown() {
     for(manager <- logManagers; dir <- manager.logDirs)
-      CoreUtils.rm(dir)
+      Utils.delete(dir)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index e84780a..7258980 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -58,7 +58,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
   @After
   override def tearDown() {
     servers.foreach(_.shutdown())
-    servers.foreach(server => CoreUtils.rm(server.config.logDirs))
+    servers.foreach(server => CoreUtils.delete(server.config.logDirs))
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 8c86a7b..d5c696e 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -30,6 +30,7 @@ import kafka.utils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -57,7 +58,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
   override def tearDown() {
     simpleConsumer.close
     server.shutdown
-    CoreUtils.rm(logDir)
+    Utils.delete(logDir)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index e13bfd9..d37de76 100755
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -26,6 +26,7 @@ import java.io.File
 
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer}
+import org.apache.kafka.common.utils.Utils
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
@@ -94,7 +95,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness {
     producer.close()
     for (server <- servers) {
       server.shutdown()
-      CoreUtils.rm(server.config.logDirs(0))
+      Utils.delete(new File(server.config.logDirs(0)))
     }
     super.tearDown()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 1d5148b..29eaf2d 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -19,13 +19,14 @@ package kafka.server
 
 import kafka.api.{GroupCoordinatorRequest, OffsetCommitRequest, OffsetFetchRequest}
 import kafka.consumer.SimpleConsumer
-import kafka.common.{OffsetMetadata, OffsetMetadataAndError, OffsetAndMetadata, TopicAndPartition}
+import kafka.common.{OffsetAndMetadata, OffsetMetadata, OffsetMetadataAndError, TopicAndPartition}
 import kafka.utils._
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.Errors
-
+import org.apache.kafka.common.utils.Utils
 import org.junit.{After, Before, Test}
+import org.junit.Assert._
 
 import java.util.Properties
 import java.io.File
@@ -33,8 +34,6 @@ import java.io.File
 import scala.util.Random
 import scala.collection._
 
-import org.junit.Assert._
-
 class OffsetCommitTest extends ZooKeeperTestHarness {
   val random: Random = new Random()
   val group = "test-group"
@@ -71,7 +70,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
   override def tearDown() {
     simpleConsumer.close
     server.shutdown
-    CoreUtils.rm(logDir)
+    Utils.delete(logDir)
     super.tearDown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
index c26ff13..8e25366 100755
--- a/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
@@ -51,7 +51,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     server1.startup()
     assertEquals(server1.config.brokerId, 1001)
     server1.shutdown()
-    CoreUtils.rm(server1.config.logDirs)
+    CoreUtils.delete(server1.config.logDirs)
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
@@ -75,9 +75,9 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     assertTrue(verifyBrokerMetadata(server1.config.logDirs,1001))
     assertTrue(verifyBrokerMetadata(server2.config.logDirs,0))
     assertTrue(verifyBrokerMetadata(server3.config.logDirs,1002))
-    CoreUtils.rm(server1.config.logDirs)
-    CoreUtils.rm(server2.config.logDirs)
-    CoreUtils.rm(server3.config.logDirs)
+    CoreUtils.delete(server1.config.logDirs)
+    CoreUtils.delete(server2.config.logDirs)
+    CoreUtils.delete(server3.config.logDirs)
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
@@ -93,7 +93,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     assertEquals(server3.config.brokerId,3)
     server3.shutdown()
     assertTrue(verifyBrokerMetadata(server3.config.logDirs,3))
-    CoreUtils.rm(server3.config.logDirs)
+    CoreUtils.delete(server3.config.logDirs)
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
@@ -116,7 +116,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     server1.startup()
     server1.shutdown()
     assertTrue(verifyBrokerMetadata(config1.logDirs, 1001))
-    CoreUtils.rm(server1.config.logDirs)
+    CoreUtils.delete(server1.config.logDirs)
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
@@ -133,7 +133,7 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
       case e: kafka.common.InconsistentBrokerIdException => //success
     }
     server1.shutdown()
-    CoreUtils.rm(server1.config.logDirs)
+    CoreUtils.delete(server1.config.logDirs)
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 
@@ -170,8 +170,8 @@ class ServerGenerateBrokerIdTest extends ZooKeeperTestHarness {
     // verify correct broker metadata was written
     assertTrue(verifyBrokerMetadata(serverA.config.logDirs,1))
     assertTrue(verifyBrokerMetadata(newServerB.config.logDirs,2))
-    CoreUtils.rm(serverA.config.logDirs)
-    CoreUtils.rm(newServerB.config.logDirs)
+    CoreUtils.delete(serverA.config.logDirs)
+    CoreUtils.delete(newServerB.config.logDirs)
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 67f62d9..bc71edd 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -104,7 +104,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     consumer.close()
     producer.close()
     server.shutdown()
-    CoreUtils.rm(server.config.logDirs)
+    CoreUtils.delete(server.config.logDirs)
     verifyNonDaemonThreadsStatus
   }
 
@@ -117,7 +117,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     server.startup()
     server.shutdown()
     server.awaitShutdown()
-    CoreUtils.rm(server.config.logDirs)
+    CoreUtils.delete(server.config.logDirs)
     verifyNonDaemonThreadsStatus
   }
 
@@ -145,7 +145,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
         server.shutdown()
       server.awaitShutdown()
     }
-    CoreUtils.rm(server.config.logDirs)
+    CoreUtils.delete(server.config.logDirs)
     verifyNonDaemonThreadsStatus
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index b321a02..9b49365 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -40,7 +40,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     assertTrue(pathExists)
 
     server.shutdown()
-    CoreUtils.rm(server.config.logDirs)
+    CoreUtils.delete(server.config.logDirs)
   }
 
   @Test
@@ -66,7 +66,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     assertEquals(brokerRegistration, zkUtils.readData(ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
 
     server1.shutdown()
-    CoreUtils.rm(server1.config.logDirs)
+    CoreUtils.delete(server1.config.logDirs)
   }
 
   @Test
@@ -80,6 +80,6 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     assertEquals(brokerId, server.metadataCache.getAliveBrokers.head.id)
 
     server.shutdown()
-    CoreUtils.rm(server.config.logDirs)
+    CoreUtils.delete(server.config.logDirs)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 0730468..a1e7912 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -21,9 +21,8 @@ import java.io._
 import java.nio._
 import java.nio.file.Files
 import java.nio.channels._
-import java.util
-import java.util.concurrent.{Callable, TimeUnit, Executors}
-import java.util.{Collections, Random, Properties}
+import java.util.concurrent.{Callable, Executors, TimeUnit}
+import java.util.{Collections, Properties, Random}
 import java.security.cert.X509Certificate
 import javax.net.ssl.X509TrustManager
 import charset.Charset
@@ -52,6 +51,7 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.network.Mode
 import org.apache.kafka.common.record.CompressionType
 import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer}
+import org.apache.kafka.common.utils.Utils
 
 import scala.collection.Map
 import scala.collection.JavaConversions._
@@ -100,7 +100,7 @@ object TestUtils extends Logging {
 
     Runtime.getRuntime().addShutdownHook(new Thread() {
       override def run() = {
-        CoreUtils.rm(f)
+        Utils.delete(f)
       }
     })
     f
@@ -1115,7 +1115,7 @@ object TestUtils extends Logging {
       }
     } catch {
       case ie: InterruptedException => failWithTimeout()
-      case e => exceptions += e
+      case e: Throwable => exceptions += e
     } finally {
       threadPool.shutdownNow()
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/43d5078e/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
index 5fa2f65..1030c46 100755
--- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
+++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
@@ -21,10 +21,9 @@ import org.apache.zookeeper.server.ZooKeeperServer
 import org.apache.zookeeper.server.NIOServerCnxnFactory
 import kafka.utils.TestUtils
 import java.net.InetSocketAddress
-import javax.security.auth.login.Configuration
+
 import kafka.utils.CoreUtils
-import org.apache.kafka.common.security.JaasUtils
-import org.apache.kafka.common.utils.Utils.getPort
+import org.apache.kafka.common.utils.Utils
 
 class EmbeddedZookeeper() {
   val snapshotDir = TestUtils.tempDir()
@@ -40,8 +39,8 @@ class EmbeddedZookeeper() {
   def shutdown() {
     CoreUtils.swallow(zookeeper.shutdown())
     CoreUtils.swallow(factory.shutdown())
-    CoreUtils.rm(logDir)
-    CoreUtils.rm(snapshotDir)
+    Utils.delete(logDir)
+    Utils.delete(snapshotDir)
   }
   
 }


Mime
View raw message