kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-1323; Fix regression due to KAFKA-1315 (support for relative directories in log.dirs property broke). Patched by Timothy Chen and Guozhang Wang; reviewed by Joel Koshy, Neha Narkhede and Jun Rao.
Date Wed, 16 Apr 2014 20:30:08 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.8.1 48f1b7490 -> 3c4ca854f


KAFKA-1323; Fix regression due to KAFKA-1315 (support for relative
    directories in log.dirs property broke). Patched by Timothy Chen and
    Guozhang Wang; reviewed by Joel Koshy, Neha Narkhede and Jun Rao.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3c4ca854
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3c4ca854
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3c4ca854

Branch: refs/heads/0.8.1
Commit: 3c4ca854fd2da5e5fcecdaf0856a38a9ebe4763c
Parents: 48f1b74
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Wed Apr 16 13:29:33 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Wed Apr 16 13:29:33 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  2 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  2 +-
 .../scala/kafka/server/ReplicaManager.scala     |  2 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   | 78 ++++++++++++++++----
 .../kafka/log4j/KafkaLog4jAppenderTest.scala    | 37 +++++-----
 .../unit/kafka/server/ReplicaManagerTest.scala  | 55 ++++++++++----
 .../test/scala/unit/kafka/utils/TestUtils.scala | 10 +++
 7 files changed, 140 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3c4ca854/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 810952e..8b9487c 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -88,7 +88,7 @@ class Partition(val topic: String,
         if (isReplicaLocal(replicaId)) {
           val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient,
topic))
           val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
-          val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent)
+          val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
           val offsetMap = checkpoint.read
           if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
             warn("No checkpointed highwatermark is found for partition [%s,%d]".format(topic,
partitionId))

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c4ca854/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index bcd2bb7..7cee543 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -52,7 +52,7 @@ class LogManager(val logDirs: Array[File],
   private val logs = new Pool[TopicAndPartition, Log]()
 
   createAndValidateLogDirs(logDirs)
-  private var dirLocks = lockLogDirs(logDirs)
+  private val dirLocks = lockLogDirs(logDirs)
   private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new
File(dir, RecoveryPointCheckpointFile)))).toMap
   loadLogs(logDirs)
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c4ca854/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 7df56ce..ad4ffe0 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -440,7 +440,7 @@ class ReplicaManager(val config: KafkaConfig,
    */
   def checkpointHighWatermarks() {
     val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica)
=> replica}
-    val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
+    val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
     for((dir, reps) <- replicasByDir) {
       val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap
       try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c4ca854/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index b4bee33..be1a1ee 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -201,6 +201,7 @@ class LogManagerTest extends JUnit3Suite {
   /**
    * Test that it is not possible to open two log managers using the same data directory
    */
+  @Test
   def testTwoLogManagersUsingSameDirFails() {
     try {
       new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L,
time.scheduler, time)
@@ -209,24 +210,75 @@ class LogManagerTest extends JUnit3Suite {
       case e: KafkaException => // this is good 
     }
   }
-  
+
   /**
    * Test that recovery points are correctly written out to disk
    */
+  @Test
   def testCheckpointRecoveryPoints() {
-    val topicA = TopicAndPartition("test-a", 1)
-    val topicB = TopicAndPartition("test-b", 1)
-    val logA = this.logManager.createLog(topicA, logConfig)
-    val logB = this.logManager.createLog(topicB, logConfig)
-    for(i <- 0 until 50) 
-      logA.append(TestUtils.singleMessageSet("test".getBytes()))
-    for(i <- 0 until 100)
-      logB.append(TestUtils.singleMessageSet("test".getBytes()))
-    logA.flush()
-    logB.flush()
+    verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1), TopicAndPartition("test-b",
1)), logManager)
+  }
+
+  /**
+   * Test that recovery points directory checking works with trailing slash
+   */
+  @Test
+  def testRecoveryDirectoryMappingWithTrailingSlash() {
+    logManager.shutdown()
+    logDir = TestUtils.tempDir()
+    logManager = new LogManager(logDirs = Array(new File(logDir.getAbsolutePath + File.separator)),
+      topicConfigs = Map(),
+      defaultConfig = logConfig,
+      cleanerConfig = cleanerConfig,
+      flushCheckMs = 1000L,
+      flushCheckpointMs = 100000L,
+      retentionCheckMs = 1000L,
+      scheduler = time.scheduler,
+      time = time)
+    logManager.startup
+    verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
+  }
+
+  /**
+   * Test that recovery points directory checking works with relative directory
+   */
+  @Test
+  def testRecoveryDirectoryMappingWithRelativeDirectory() {
+    logManager.shutdown()
+    logDir = new File("data" + File.separator + logDir.getName)
+    logDir.mkdirs()
+    logDir.deleteOnExit()
+    logManager = new LogManager(logDirs = Array(logDir),
+      topicConfigs = Map(),
+      defaultConfig = logConfig,
+      cleanerConfig = cleanerConfig,
+      flushCheckMs = 1000L,
+      flushCheckpointMs = 100000L,
+      retentionCheckMs = 1000L,
+      scheduler = time.scheduler,
+      time = time)
+    logManager.startup
+    verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
+  }
+
+
+  private def verifyCheckpointRecovery(topicAndPartitions: Seq[TopicAndPartition],
+                                       logManager: LogManager) {
+    val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig))
+    logs.foreach(log => {
+      for(i <- 0 until 50)
+        log.append(TestUtils.singleMessageSet("test".getBytes()))
+
+      log.flush()
+    })
+
     logManager.checkpointRecoveryPointOffsets()
     val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read()
-    assertEquals("Recovery point should equal checkpoint", checkpoints(topicA), logA.recoveryPoint)
-    assertEquals("Recovery point should equal checkpoint", checkpoints(topicB), logB.recoveryPoint)
+
+    topicAndPartitions.zip(logs).foreach {
+      case(tp, log) => {
+        assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint)
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c4ca854/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
index 67497dd..4dcd41a 100644
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -6,32 +6,35 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package kafka.log4j
 
-import java.util.Properties
-import java.io.File
 import kafka.consumer.SimpleConsumer
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{TestUtils, Utils, Logging}
-import junit.framework.Assert._
 import kafka.api.FetchRequestBuilder
 import kafka.producer.async.MissingConfigException
 import kafka.serializer.Encoder
 import kafka.zk.ZooKeeperTestHarness
+
+import java.util.Properties
+import java.io.File
+
 import org.apache.log4j.spi.LoggingEvent
 import org.apache.log4j.{PropertyConfigurator, Logger}
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnit3Suite
 
+import junit.framework.Assert._
+
 class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
   var logDirZk: File = null
@@ -72,8 +75,8 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness
with
     var props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
-    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
     props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
@@ -82,15 +85,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness
with
     try {
       PropertyConfigurator.configure(props)
       fail("Missing properties exception was expected !")
-    }catch {
+    } catch {
       case e: MissingConfigException =>
     }
 
     props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
-    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
     props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
@@ -99,15 +102,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness
with
     try {
       PropertyConfigurator.configure(props)
       fail("Missing properties exception was expected !")
-    }catch {
+    } catch {
       case e: MissingConfigException =>
     }
 
     props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
-    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
     props.put("log4j.appender.KAFKA.SerializerClass", "kafka.log4j.AppenderStringEncoder")
     props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
@@ -116,15 +119,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness
with
     try {
       PropertyConfigurator.configure(props)
       fail("Missing properties exception was expected !")
-    }catch {
+    } catch {
       case e: MissingConfigException =>
     }
 
     props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
-    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
     props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
@@ -132,7 +135,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness
with
     // serializer missing
     try {
       PropertyConfigurator.configure(props)
-    }catch {
+    } catch {
       case e: MissingConfigException => fail("should default to kafka.serializer.StringEncoder")
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c4ca854/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b5936d4..57688f1 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -5,8 +5,8 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,30 +17,59 @@
 
 package kafka.server
 
-import org.scalatest.junit.JUnit3Suite
-import org.junit.Test
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
+import kafka.log.{CleanerConfig, LogManager, LogConfig}
+
 import java.util.concurrent.atomic.AtomicBoolean
 import java.io.File
+
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
-import kafka.cluster.Replica
-import kafka.log.{LogManager, LogConfig, Log}
+import org.scalatest.junit.JUnit3Suite
+import org.junit.Test
 
 class ReplicaManagerTest extends JUnit3Suite {
+
+  val topic = "test-topic"
+
   @Test
-  def testHighwaterMarkDirectoryMapping() {
+  def testHighWaterMarkDirectoryMapping() {
     val props = TestUtils.createBrokerConfig(1)
-    val dir = "/tmp/kafka-logs/"
-    new File(dir).mkdir()
-    props.setProperty("log.dirs", dir)
     val config = new KafkaConfig(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val mockLogMgr = EasyMock.createMock(classOf[LogManager])
+    val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
     val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false))
-    val partition = rm.getOrCreatePartition("test-topic", 1, 1)
-    partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new
File("/tmp/kafka-logs/test-topic-1"), new LogConfig(), 0L, null))))
+    val partition = rm.getOrCreatePartition(topic, 1, 1)
+    partition.getOrCreateReplica(1)
     rm.checkpointHighWatermarks()
   }
+
+  @Test
+  def testHighwaterMarkRelativeDirectoryMapping() {
+    val props = TestUtils.createBrokerConfig(1)
+    props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+    val config = new KafkaConfig(props)
+    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray)
+    val time: MockTime = new MockTime()
+    val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false))
+    val partition = rm.getOrCreatePartition(topic, 1, 1)
+    partition.getOrCreateReplica(1)
+    rm.checkpointHighWatermarks()
+  }
+
+  private def createLogManager(logDirs: Array[File]): LogManager = {
+    val time = new MockTime()
+    return new LogManager(logDirs,
+      topicConfigs = Map(),
+      defaultConfig = new LogConfig(),
+      cleanerConfig = CleanerConfig(enableCleaner = false),
+      flushCheckMs = 1000L,
+      flushCheckpointMs = 100000L,
+      retentionCheckMs = 1000L,
+      scheduler = time.scheduler,
+      time = time)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3c4ca854/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 500eeca..33ced14 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -85,6 +85,16 @@ object TestUtils extends Logging {
   }
 
   /**
+   * Create a temporary relative directory
+   */
+  def tempRelativeDir(parent: String): File = {
+    val f = new File(parent, "kafka-" + random.nextInt(1000000))
+    f.mkdirs()
+    f.deleteOnExit()
+    f
+  }
+
+  /**
    * Create a temporary file
    */
   def tempFile(): File = {


Mime
View raw message