kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-1323; Fix regression due to KAFKA-1315 (support for relative directories in log.dirs property broke). Patched by Timothy Chen and Guozhang Wang; reviewed by Joel Koshy, Neha Narkhede and Jun Rao.
Date Wed, 16 Apr 2014 17:34:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ec075c5a8 -> 97f13ec73


KAFKA-1323; Fix regression due to KAFKA-1315 (support for relative
directories in log.dirs property broke). Patched by Timothy Chen and
Guozhang Wang; reviewed by Joel Koshy, Neha Narkhede and Jun Rao.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/97f13ec7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/97f13ec7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/97f13ec7

Branch: refs/heads/trunk
Commit: 97f13ec73255f3978c1cbb80ea26f446cf756515
Parents: ec075c5
Author: Joel Koshy <jjkoshy@gmail.com>
Authored: Tue Apr 15 14:08:21 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Wed Apr 16 10:31:36 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  2 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  2 +-
 .../scala/kafka/server/ReplicaManager.scala     |  2 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   | 78 ++++++++++++++++----
 .../kafka/log4j/KafkaLog4jAppenderTest.scala    | 27 ++++---
 .../unit/kafka/server/ReplicaManagerTest.scala  | 48 +++++++++---
 .../test/scala/unit/kafka/utils/TestUtils.scala | 10 +++
 7 files changed, 132 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/97f13ec7/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 6d6b4ab..c08eab0 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -90,7 +90,7 @@ class Partition(val topic: String,
         if (isReplicaLocal(replicaId)) {
           val config = LogConfig.fromProps(logManager.defaultConfig.toProps, AdminUtils.fetchTopicConfig(zkClient,
topic))
           val log = logManager.createLog(TopicAndPartition(topic, partitionId), config)
-          val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParent)
+          val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
           val offsetMap = checkpoint.read
           if (!offsetMap.contains(TopicAndPartition(topic, partitionId)))
             warn("No checkpointed highwatermark is found for partition [%s,%d]".format(topic,
partitionId))

http://git-wip-us.apache.org/repos/asf/kafka/blob/97f13ec7/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 8ed9b68..ac67f08 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -52,7 +52,7 @@ class LogManager(val logDirs: Array[File],
   private val logs = new Pool[TopicAndPartition, Log]()
 
   createAndValidateLogDirs(logDirs)
-  private var dirLocks = lockLogDirs(logDirs)
+  private val dirLocks = lockLogDirs(logDirs)
   private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new
File(dir, RecoveryPointCheckpointFile)))).toMap
   loadLogs(logDirs)
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/97f13ec7/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0f5aaa9..5588f59 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -462,7 +462,7 @@ class ReplicaManager(val config: KafkaConfig,
    */
   def checkpointHighWatermarks() {
     val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica)
=> replica}
-    val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
+    val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
     for((dir, reps) <- replicasByDir) {
       val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap
       try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/97f13ec7/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index b4bee33..be1a1ee 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -201,6 +201,7 @@ class LogManagerTest extends JUnit3Suite {
   /**
    * Test that it is not possible to open two log managers using the same data directory
    */
+  @Test
   def testTwoLogManagersUsingSameDirFails() {
     try {
       new LogManager(Array(logDir), Map(), logConfig, cleanerConfig, 1000L, 10000L, 1000L,
time.scheduler, time)
@@ -209,24 +210,75 @@ class LogManagerTest extends JUnit3Suite {
       case e: KafkaException => // this is good 
     }
   }
-  
+
   /**
    * Test that recovery points are correctly written out to disk
    */
+  @Test
   def testCheckpointRecoveryPoints() {
-    val topicA = TopicAndPartition("test-a", 1)
-    val topicB = TopicAndPartition("test-b", 1)
-    val logA = this.logManager.createLog(topicA, logConfig)
-    val logB = this.logManager.createLog(topicB, logConfig)
-    for(i <- 0 until 50) 
-      logA.append(TestUtils.singleMessageSet("test".getBytes()))
-    for(i <- 0 until 100)
-      logB.append(TestUtils.singleMessageSet("test".getBytes()))
-    logA.flush()
-    logB.flush()
+    verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1), TopicAndPartition("test-b",
1)), logManager)
+  }
+
+  /**
+   * Test that recovery points directory checking works with trailing slash
+   */
+  @Test
+  def testRecoveryDirectoryMappingWithTrailingSlash() {
+    logManager.shutdown()
+    logDir = TestUtils.tempDir()
+    logManager = new LogManager(logDirs = Array(new File(logDir.getAbsolutePath + File.separator)),
+      topicConfigs = Map(),
+      defaultConfig = logConfig,
+      cleanerConfig = cleanerConfig,
+      flushCheckMs = 1000L,
+      flushCheckpointMs = 100000L,
+      retentionCheckMs = 1000L,
+      scheduler = time.scheduler,
+      time = time)
+    logManager.startup
+    verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
+  }
+
+  /**
+   * Test that recovery points directory checking works with relative directory
+   */
+  @Test
+  def testRecoveryDirectoryMappingWithRelativeDirectory() {
+    logManager.shutdown()
+    logDir = new File("data" + File.separator + logDir.getName)
+    logDir.mkdirs()
+    logDir.deleteOnExit()
+    logManager = new LogManager(logDirs = Array(logDir),
+      topicConfigs = Map(),
+      defaultConfig = logConfig,
+      cleanerConfig = cleanerConfig,
+      flushCheckMs = 1000L,
+      flushCheckpointMs = 100000L,
+      retentionCheckMs = 1000L,
+      scheduler = time.scheduler,
+      time = time)
+    logManager.startup
+    verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
+  }
+
+
+  private def verifyCheckpointRecovery(topicAndPartitions: Seq[TopicAndPartition],
+                                       logManager: LogManager) {
+    val logs = topicAndPartitions.map(this.logManager.createLog(_, logConfig))
+    logs.foreach(log => {
+      for(i <- 0 until 50)
+        log.append(TestUtils.singleMessageSet("test".getBytes()))
+
+      log.flush()
+    })
+
     logManager.checkpointRecoveryPointOffsets()
     val checkpoints = new OffsetCheckpoint(new File(logDir, logManager.RecoveryPointCheckpointFile)).read()
-    assertEquals("Recovery point should equal checkpoint", checkpoints(topicA), logA.recoveryPoint)
-    assertEquals("Recovery point should equal checkpoint", checkpoints(topicB), logB.recoveryPoint)
+
+    topicAndPartitions.zip(logs).foreach {
+      case(tp, log) => {
+        assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint)
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/97f13ec7/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
index bbfb01e..4ea0489 100644
--- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
+++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -6,32 +6,35 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package kafka.log4j
 
-import java.util.Properties
-import java.io.File
 import kafka.consumer.SimpleConsumer
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{TestUtils, Utils, Logging}
-import junit.framework.Assert._
 import kafka.api.FetchRequestBuilder
 import kafka.producer.async.MissingConfigException
 import kafka.serializer.Encoder
 import kafka.zk.ZooKeeperTestHarness
+
+import java.util.Properties
+import java.io.File
+
 import org.apache.log4j.spi.LoggingEvent
 import org.apache.log4j.{PropertyConfigurator, Logger}
 import org.junit.{After, Before, Test}
 import org.scalatest.junit.JUnit3Suite
 
+import junit.framework.Assert._
+
 class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
   var logDirZk: File = null
@@ -56,7 +59,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness
with
     logDirZk = new File(logDirZkPath)
     config = new KafkaConfig(propsZk)
     server = TestUtils.createServer(config)
-    simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "")
+    simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64 * 1024, "")
   }
 
   @After
@@ -73,15 +76,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness
with
     var props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
-    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
     props.put("log4j.appender.KAFKA.Topic", "test-topic")
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     try {
       PropertyConfigurator.configure(props)
       fail("Missing properties exception was expected !")
-    }catch {
+    } catch {
       case e: MissingConfigException =>
     }
 
@@ -89,15 +92,15 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness
with
     props = new Properties()
     props.put("log4j.rootLogger", "INFO")
     props.put("log4j.appender.KAFKA", "kafka.producer.KafkaLog4jAppender")
-    props.put("log4j.appender.KAFKA.layout","org.apache.log4j.PatternLayout")
-    props.put("log4j.appender.KAFKA.layout.ConversionPattern","%-5p: %c - %m%n")
+    props.put("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout")
+    props.put("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n")
     props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config)))
     props.put("log4j.logger.kafka.log4j", "INFO, KAFKA")
 
     try {
       PropertyConfigurator.configure(props)
       fail("Missing properties exception was expected !")
-    }catch {
+    } catch {
       case e: MissingConfigException =>
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/97f13ec7/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index a71e48d..41ebc7a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  * 
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,29 +17,59 @@
 
 package kafka.server
 
-import org.scalatest.junit.JUnit3Suite
-import org.junit.Test
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
+import kafka.log.{CleanerConfig, LogManager, LogConfig}
+
 import java.util.concurrent.atomic.AtomicBoolean
 import java.io.File
+
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
-import kafka.cluster.Replica
-import kafka.log.{LogManager, LogConfig, Log}
+import org.scalatest.junit.JUnit3Suite
+import org.junit.Test
 
 class ReplicaManagerTest extends JUnit3Suite {
+
+  val topic = "test-topic"
+
   @Test
   def testHighWaterMarkDirectoryMapping() {
     val props = TestUtils.createBrokerConfig(1)
-    val topic = "test-topic"
     val config = new KafkaConfig(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val mockLogMgr = EasyMock.createMock(classOf[LogManager])
+    val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
     val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false))
     val partition = rm.getOrCreatePartition(topic, 1, 1)
-    val logFilename = config.logDirs.head + File.separator + topic + "-1"
-    partition.addReplicaIfNotExists(new Replica(1, partition, time, 0L, Option(new Log(new
File(logFilename), new LogConfig(), 0L, null))))
+    partition.getOrCreateReplica(1)
     rm.checkpointHighWatermarks()
   }
+
+  @Test
+  def testHighwaterMarkRelativeDirectoryMapping() {
+    val props = TestUtils.createBrokerConfig(1)
+    props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+    val config = new KafkaConfig(props)
+    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray)
+    val time: MockTime = new MockTime()
+    val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false))
+    val partition = rm.getOrCreatePartition(topic, 1, 1)
+    partition.getOrCreateReplica(1)
+    rm.checkpointHighWatermarks()
+  }
+
+  private def createLogManager(logDirs: Array[File]): LogManager = {
+    val time = new MockTime()
+    return new LogManager(logDirs,
+      topicConfigs = Map(),
+      defaultConfig = new LogConfig(),
+      cleanerConfig = CleanerConfig(enableCleaner = false),
+      flushCheckMs = 1000L,
+      flushCheckpointMs = 100000L,
+      retentionCheckMs = 1000L,
+      scheduler = time.scheduler,
+      time = time)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/97f13ec7/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e31fb90..4bd5964 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -89,6 +89,16 @@ object TestUtils extends Logging {
   }
 
   /**
+   * Create a temporary relative directory
+   */
+  def tempRelativeDir(parent: String): File = {
+    val f = new File(parent, "kafka-" + random.nextInt(1000000))
+    f.mkdirs()
+    f.deleteOnExit()
+    f
+  }
+
+  /**
    * Create a temporary file
    */
   def tempFile(): File = {


Mime
View raw message